web_rpc/
lib.rs

1//! The `web-rpc` create is a library for performing RPCs (remote proceedure calls) between
2//! browsing contexts, web workers, and channels. It allows you to define an RPC using a trait
3//! similar to Google's [tarpc](https://github.com/google/tarpc) and will transparently
4//! handle the serialization and deserialization of the arguments. Moreover, it can post
5//! anything that implements [`AsRef<JsValue>`](https://docs.rs/wasm-bindgen/latest/wasm_bindgen/struct.JsValue.html) and also supports transferring ownership.
6//!
7//! ## Quick start
8//! To get started define a trait for your RPC service as follows. Annnotate this trait with the
9//! `service` procedural macro that is exported by this crate:
10//! ```rust
11//! #[web_rpc::service]
12//! pub trait Calculator {
13//!     fn add(left: u32, right: u32) -> u32;
14//! }
15//! ```
16//! This macro will generate the structs `CalculatorClient`, `CalculatorService`, and a new trait
17//! `Calculator` that you can use to implement the service as follows:
18//! ```rust
19//! struct CalculatorServiceImpl;
20//!
21//! impl Calculator for CalculatorServiceImpl {
22//!     fn add(&self, left: u32, right: u32) -> u32 {
23//!         left + right
24//!     }
25//! }
26//! ```
27//! Note that the version of the trait emitted from the macro adds a `&self` receiver. Although not
28//! used in this example, this is useful when we want the RPC to modify some state (via interior
29//! mutability). Now that we have defined our RPC, let's create a client and server for it! In this
30//! example, we will use [`MessageChannel`](https://docs.rs/web-sys/latest/web_sys/struct.MessageChannel.html)
31//! since it is easy to construct and test, however, a more common case would be to construct the
32//! channel from a [`Worker`](https://docs.rs/web-sys/latest/web_sys/struct.Worker.html) or a
33//! [`DedicatedWorkerGlobalScope`](https://docs.rs/web-sys/latest/web_sys/struct.DedicatedWorkerGlobalScope.html).
34//! Let's start by defining the server:
35//! ```rust
36//! // create a MessageChannel
37//! let channel = web_sys::MessageChannel::new();
38//! // Create two interfaces from the ports
39//! let (server_interface, client_interface) = futures_util::future::join(
40//!     web_rpc::Interface::new(channel.port1()),
41//!     web_rpc::Interface::new(channel.port2()),
42//! ).await;
43//! // create a server with the first interface
44//! let server = web_rpc::Builder::new(server_interface)
45//!     .with_service::<CalculatorService<_>>(CalculatorServiceImpl)
46//!     .build();
47//! // spawn the server
48//! wasm_bindgen_futures::spawn_local(server);
49//! ```
50//! [`Interface::new`] is async since there is no way to synchronously check whether a channel or
51//! a worker is ready to receive messages. To workaround this, temporary listeners are attached to
52//! determine when a channel is ready for communication. The server returned by the build method is
53//! a future that can be added to the browser's event loop using
54//! [`wasm_bindgen_futures::spawn_local`], however, this will run the server indefinitely. For more
55//! control, consider wrapping the server with [`futures_util::FutureExt::remote_handle`] before
56//! spawning it, which will shutdown the server once the handle has been dropped. Moving onto the
57//! client:
58//! ```rust
59//! // create a client using the second interface
60//! let client = web_rpc::Builder::new(client_interface)
61//!     .with_client::<CalculatorClient>()
62//!     .build();
63//! /* call `add` */
64//! assert_eq!(client.add(41, 1).await, 42);
65//! ```
66//! That is it! Underneath the hood, the client will serialize its arguments using bincode and
67//! transfer the bytes to server. The server will deserialize those arguments and run
68//! `<CalculatorServiceImpl as Calculator>::add` before returning the result to the client. Note
69//! that we are only awaiting the response of the call to `add`, the request itself is sent
70//! synchronously before we await anything.
71//!
72//! ## Advanced examples
73//! Now that we have the basic idea of how define an RPC trait and set up a server and client, let's
74//! dive into some of the more advanced features of this library!
75//!
76//! ### Synchronous and asynchronous RPC methods
77//! Server methods can be asynchronous! That is, you can define the following RPC trait and service
78//! implementation:
79//! ```rust
80//! #[web_rpc::service]
81//! pub trait Sleep {
82//!     async fn sleep(interval: Duration);
83//! }
84//!
85//! struct SleepServiceImpl;
86//! impl Sleep for SleepServiceImpl {
87//!     async fn sleep(&self, interval: Duration) -> bool {
88//!         gloo_timers::future::sleep(interval).await;
89//!         // sleep completed (was not cancelled)
90//!         true
91//!     }
92//! }
93//! ```
94//! Asynchronous RPC methods are run concurrently on the server and also support cancellation if the
95//! future on the client side is dropped. However, such a future is only returned from a client
96//! method if the RPC returns a value. Otherwise the RPC is considered a notification.
97//!
98//! ### Notifications
99//! Notifications are RPCs that do not return anything. On the client side, the method is completely
100//! synchronous and also returns nothing. This setup is useful if you need to communicate with
101//! another part of your application but cannot yield to the event loop.
102//!
103//! The implication of this, however, is that even if the server method is asynchronous, we are
104//! unable to cancel it from the client side since we do not have a future that can be dropped.
105//!
106//! ### Posting and transferring Javascript types
107//! In the example above, we discussed how the client serializes its arguments before sending them
108//! to the server. This approach is convenient, but how do send web types such as a
109//! `WebAssembly.Module` or an `OffscreenCanvas` that have no serializable representation? Well, we
110//! are in luck since this happens to be one of the key features of this crate. Consider the
111//! following RPC trait:
112//! ```rust
113//! #[web_rpc::service]
114//! pub trait Concat {
115//!     #[post(left, right, return)]
116//!     fn concat_with_space(
117//!         left: js_sys::JsString,
118//!         right: js_sys::JsString
119//!     ) -> js_sys::JsString;
120//! }
121//! ```
122//! All we have done is added the `post` attribute to the method and listed the arguments that we
123//! would like to be posted to the other side. Under the hood, the implementation of the client will
124//! then skip these arguments during serialization and just append them after the serialized message
125//! to the array that will be posted. As shown above, this also works for the return type by just
126//! specifying `return` in the post attribute. For web types that need to be transferred, we simply
127//! wrap them in `transfer` as follows:
128//! ```rust
129//! #[web_rpc::service]
130//! pub trait GameEngine {
131//!     #[post(transfer(canvas))]
132//!     fn send_canvas(
133//!         canvas: js_sys::OffscreenCanvas,
134//!     );
135//! }
136//! ```
137//! ### Bi-directional RPC
138//! In the original example, we created a server on the first port of the message channel and a
139//! client on the second port. However, it is possible to define both a client and a server on each
140//! side, enabling bi-directional RPC. This is particularly useful if we want to send and receive
141//! messages from a worker without sending it a seperate channel for the bi-directional
142//! communication. Our original example can be extended as follows:
143//! ```rust
144//! /* create channel */
145//! let channel = web_sys::MessageChannel::new().unwrap();
146//! let (interface1, interface2) = futures_util::future::join(
147//!     web_rpc::Interface::new(channel.port1()),
148//!     web_rpc::Interface::new(channel.port2()),
149//! ).await;
150//! /* create server1 and client1 */
151//! let (client1, server1) = web_rpc::Builder::new(interface1)
152//!     .with_service::<CalculatorService<_>>(CalculatorServiceImpl)
153//!     .with_client::<CalculatorClient>()
154//!     .build();
155//! /* create server2 and client2 */
156//! let (client2, server2) = web_rpc::Builder::new(interface2)
157//!     .with_service::<CalculatorService<_>>(CalculatorServiceImpl)
158//!     .with_client::<CalculatorClient>()
159//!     .build();
160//! ```
161
162use std::{
163    cell::RefCell,
164    marker::PhantomData,
165    pin::Pin,
166    rc::Rc,
167    task::{Context, Poll},
168};
169
170use futures_channel::mpsc;
171use futures_core::{future::LocalBoxFuture, Future};
172use futures_util::{FutureExt, StreamExt};
173use gloo_events::EventListener;
174use js_sys::{ArrayBuffer, Uint8Array};
175use serde::{de::DeserializeOwned, Deserialize, Serialize};
176use wasm_bindgen::JsCast;
177
178#[doc(hidden)]
179pub use bincode;
180#[doc(hidden)]
181pub use futures_channel;
182#[doc(hidden)]
183pub use futures_core;
184#[doc(hidden)]
185pub use futures_util;
186#[doc(hidden)]
187pub use gloo_events;
188#[doc(hidden)]
189pub use js_sys;
190#[doc(hidden)]
191pub use pin_utils;
192#[doc(hidden)]
193pub use serde;
194#[doc(hidden)]
195pub use wasm_bindgen;
196
197pub use web_rpc_macro::service;
198
199pub mod client;
200pub mod interface;
201pub mod port;
202#[doc(hidden)]
203pub mod service;
204
205pub use interface::Interface;
206
207#[doc(hidden)]
208#[derive(Serialize, Deserialize)]
209pub enum Message<Request, Response> {
210    Request(usize, Request),
211    Abort(usize),
212    Response(usize, Response),
213}
214
215/// This struct allows one to configure the RPC interface prior to creating it.
216/// To get an instance of this struct, call [`Builder<C, S>::new`] with
217/// an [`Interface`].
218pub struct Builder<C, S> {
219    client: PhantomData<C>,
220    service: S,
221    interface: Interface,
222}
223
224impl Builder<(), ()> {
225    /// Create a new builder from an [`Interface`]
226    pub fn new(interface: Interface) -> Self {
227        Self {
228            interface,
229            client: PhantomData::<()>,
230            service: (),
231        }
232    }
233}
234
235impl<C> Builder<C, ()> {
236    /// Configure the RPC interface with a service that implements methods
237    /// that can be called from the other side of the channel. To use this method,
238    /// you need to specify the type `S` which is the service type generated by the
239    /// attribute macro [`macro@service`]. The implementation parameter is then an
240    /// instance of something that implements the trait to which to applied the
241    /// [`macro@service`] macro. For example, if you have a trait `Calculator` to
242    /// which you have applied [`macro@service`], you would use this method as follows:
243    /// ```
244    /// struct CalculatorServiceImpl;
245    /// impl Calculator for CalculatorServiceImpl { /* add Calculator's methods */}
246    /// let server = Builder::new(some_interface)
247    ///     .with_service<CalculatorService<_>>(CalculatorServiceImpl)
248    ///     .build();
249    /// ```
250    pub fn with_service<S: service::Service>(self, implementation: impl Into<S>) -> Builder<C, S> {
251        let service = implementation.into();
252        let Builder {
253            interface, client, ..
254        } = self;
255        Builder {
256            interface,
257            client,
258            service,
259        }
260    }
261}
262
263impl<S> Builder<(), S> {
264    /// Configure the RPC interface with a client that allows you to execute RPCs on the
265    /// server. The builder will automatically instansiate the client for you, you just
266    /// need to provide the type which is generated via the [`macro@service`] attribute
267    /// macro. For example, if you had a trait `Calculator` to which you applied the
268    /// [`macro@service`] attribute macro, the macro would have generated a `CalculatorClient`
269    /// struct which you can use as the `C` in this function.
270    pub fn with_client<C: client::Client>(self) -> Builder<C, S> {
271        let Builder {
272            interface, service, ..
273        } = self;
274        Builder {
275            interface,
276            client: PhantomData::<C>,
277            service,
278        }
279    }
280}
281
282/// `Server` is the server that is returned from the [`Builder::build`] method given
283/// you configured the RPC interface with a service. Note that `Server` implements future and needs
284/// to be polled in order to execute and respond to inbound RPC requests.
285#[must_use = "Server must be polled in order for RPC requests to be executed"]
286pub struct Server {
287    _listener: Rc<EventListener>,
288    task: LocalBoxFuture<'static, ()>,
289}
290
291impl Future for Server {
292    type Output = ();
293
294    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
295        self.task.poll_unpin(cx)
296    }
297}
298
299impl<C> Builder<C, ()>
300where
301    C: client::Client + From<client::Configuration<C::Request, C::Response>> + 'static,
302    <C as client::Client>::Response: DeserializeOwned,
303    <C as client::Client>::Request: Serialize,
304{
305    /// Build function for client-only RPC interfaces.
306    pub fn build(self) -> C {
307        let Builder {
308            interface:
309                Interface {
310                    port,
311                    listener,
312                    mut messages_rx,
313                },
314            ..
315        } = self;
316        let client_callback_map: Rc<RefCell<client::CallbackMap<C::Response>>> = Default::default();
317        let client_callback_map_cloned = client_callback_map.clone();
318        let dispatcher = async move {
319            while let Some(array) = messages_rx.next().await {
320                let message =
321                    Uint8Array::new(&array.shift().dyn_into::<ArrayBuffer>().unwrap()).to_vec();
322                match bincode::deserialize::<Message<(), C::Response>>(&message).unwrap() {
323                    Message::Response(seq_id, response) => {
324                        if let Some(callback_tx) =
325                            client_callback_map_cloned.borrow_mut().remove(&seq_id)
326                        {
327                            let _ = callback_tx.send((response, array));
328                        }
329                    }
330                    _ => panic!("client received a server message"),
331                }
332            }
333        }
334        .boxed_local()
335        .shared();
336        let port_cloned = port.clone();
337        let abort_sender = move |seq_id: usize| {
338            let abort = Message::<C::Request, ()>::Abort(seq_id);
339            let abort = bincode::serialize(&abort).unwrap();
340            let buffer = js_sys::Uint8Array::from(&abort[..]).buffer();
341            let post_args = js_sys::Array::of1(&buffer);
342            let transfer_args = js_sys::Array::of1(&buffer);
343            port_cloned
344                .post_message(&post_args, &transfer_args)
345                .unwrap();
346        };
347        let request_serializer = |seq_id: usize, request: C::Request| {
348            let request = Message::<C::Request, ()>::Request(seq_id, request);
349            bincode::serialize(&request).unwrap()
350        };
351        C::from((
352            client_callback_map,
353            port,
354            Rc::new(listener),
355            dispatcher,
356            Rc::new(request_serializer),
357            Rc::new(abort_sender),
358        ))
359    }
360}
361
362impl<S> Builder<(), S>
363where
364    S: service::Service + 'static,
365    <S as service::Service>::Request: DeserializeOwned,
366    <S as service::Service>::Response: Serialize,
367{
368    /// Build function for server-only RPC interfaces.
369    pub fn build(self) -> Server {
370        let Builder {
371            service,
372            interface:
373                Interface {
374                    port,
375                    listener,
376                    mut messages_rx,
377                },
378            ..
379        } = self;
380        let (server_requests_tx, server_requests_rx) = mpsc::unbounded();
381        let (abort_requests_tx, abort_requests_rx) = mpsc::unbounded();
382        let dispatcher = async move {
383            while let Some(array) = messages_rx.next().await {
384                let message =
385                    Uint8Array::new(&array.shift().dyn_into::<ArrayBuffer>().unwrap()).to_vec();
386                match bincode::deserialize::<Message<S::Request, ()>>(&message).unwrap() {
387                    Message::Request(seq_id, request) => server_requests_tx
388                        .unbounded_send((seq_id, request, array))
389                        .unwrap(),
390                    Message::Abort(seq_id) => abort_requests_tx.unbounded_send(seq_id).unwrap(),
391                    _ => panic!("server received a client message"),
392                }
393            }
394        }
395        .boxed_local()
396        .shared();
397        Server {
398            _listener: Rc::new(listener),
399            task: service::task::<S, ()>(
400                service,
401                port,
402                dispatcher,
403                server_requests_rx,
404                abort_requests_rx,
405            )
406            .boxed_local(),
407        }
408    }
409}
410
411impl<C, S> Builder<C, S>
412where
413    C: client::Client + From<client::Configuration<C::Request, C::Response>> + 'static,
414    S: service::Service + 'static,
415    <S as service::Service>::Request: DeserializeOwned,
416    <S as service::Service>::Response: Serialize,
417    <C as client::Client>::Request: Serialize,
418    <C as client::Client>::Response: DeserializeOwned,
419{
420    /// Build function for client-server RPC interfaces.
421    pub fn build(self) -> (C, Server) {
422        let Builder {
423            service: server,
424            interface:
425                Interface {
426                    port,
427                    listener,
428                    mut messages_rx,
429                },
430            ..
431        } = self;
432        let client_callback_map: Rc<RefCell<client::CallbackMap<C::Response>>> = Default::default();
433        let (server_requests_tx, server_requests_rx) = mpsc::unbounded();
434        let (abort_requests_tx, abort_requests_rx) = mpsc::unbounded();
435        let client_callback_map_cloned = client_callback_map.clone();
436        let dispatcher = async move {
437            while let Some(array) = messages_rx.next().await {
438                let message = array.shift().dyn_into::<ArrayBuffer>().unwrap();
439                let message = Uint8Array::new(&message).to_vec();
440                match bincode::deserialize::<Message<S::Request, C::Response>>(&message).unwrap() {
441                    Message::Response(seq_id, response) => {
442                        if let Some(callback_tx) =
443                            client_callback_map_cloned.borrow_mut().remove(&seq_id)
444                        {
445                            let _ = callback_tx.send((response, array));
446                        }
447                    }
448                    Message::Request(seq_id, request) => server_requests_tx
449                        .unbounded_send((seq_id, request, array))
450                        .unwrap(),
451                    Message::Abort(seq_id) => abort_requests_tx.unbounded_send(seq_id).unwrap(),
452                }
453            }
454        }
455        .boxed_local()
456        .shared();
457        let port_cloned = port.clone();
458        let abort_sender = move |seq_id: usize| {
459            let abort = Message::<C::Request, S::Response>::Abort(seq_id);
460            let abort = bincode::serialize(&abort).unwrap();
461            let buffer = js_sys::Uint8Array::from(&abort[..]).buffer();
462            let post_args = js_sys::Array::of1(&buffer);
463            let transfer_args = js_sys::Array::of1(&buffer);
464            port_cloned
465                .post_message(&post_args, &transfer_args)
466                .unwrap();
467        };
468        let request_serializer = |seq_id: usize, request: C::Request| {
469            let request = Message::<C::Request, S::Response>::Request(seq_id, request);
470            bincode::serialize(&request).unwrap()
471        };
472        let listener = Rc::new(listener);
473        let client = C::from((
474            client_callback_map,
475            port.clone(),
476            listener.clone(),
477            dispatcher.clone(),
478            Rc::new(request_serializer),
479            Rc::new(abort_sender),
480        ));
481        let server = Server {
482            _listener: listener,
483            task: service::task::<S, C::Request>(
484                server,
485                port,
486                dispatcher,
487                server_requests_rx,
488                abort_requests_rx,
489            )
490            .boxed_local(),
491        };
492        (client, server)
493    }
494}