Skip to main content

web_rpc/
lib.rs

1//! The `web-rpc` create is a library for performing RPCs (remote proceedure calls) between
2//! browsing contexts, web workers, and channels. It allows you to define an RPC using a trait
3//! similar to Google's [tarpc](https://github.com/google/tarpc) and will transparently
4//! handle the serialization and deserialization of the arguments. Moreover, it can post
5//! anything that implements [`AsRef<JsValue>`](https://docs.rs/wasm-bindgen/latest/wasm_bindgen/struct.JsValue.html) and also supports transferring ownership.
6//!
7//! ## Quick start
8//! To get started define a trait for your RPC service as follows. Annnotate this trait with the
9//! `service` procedural macro that is exported by this crate:
10//! ```rust
11//! #[web_rpc::service]
12//! pub trait Calculator {
13//!     fn add(&self, left: u32, right: u32) -> u32;
14//! }
15//! ```
16//! This macro will generate the structs `CalculatorClient`, `CalculatorService`, and a new trait
17//! `Calculator` that you can use to implement the service as follows:
18//! ```rust
19//! # #[web_rpc::service]
20//! # pub trait Calculator {
21//! #     fn add(&self, left: u32, right: u32) -> u32;
22//! # }
23//! struct CalculatorServiceImpl;
24//!
25//! impl Calculator for CalculatorServiceImpl {
26//!     fn add(&self, left: u32, right: u32) -> u32 {
27//!         left + right
28//!     }
29//! }
30//! ```
31//! Note that the `&self` receiver is required in the trait definition. Although not
32//! used in this example, this is useful when we want the RPC to modify some state (via interior
33//! mutability). Now that we have defined our RPC, let's create a client and server for it! In this
34//! example, we will use [`MessageChannel`](https://docs.rs/web-sys/latest/web_sys/struct.MessageChannel.html)
35//! since it is easy to construct and test, however, a more common case would be to construct the
36//! channel from a [`Worker`](https://docs.rs/web-sys/latest/web_sys/struct.Worker.html) or a
37//! [`DedicatedWorkerGlobalScope`](https://docs.rs/web-sys/latest/web_sys/struct.DedicatedWorkerGlobalScope.html).
38//! Let's start by defining the server:
39//! ```rust,no_run
40//! # #[web_rpc::service]
41//! # pub trait Calculator {
42//! #     fn add(&self, left: u32, right: u32) -> u32;
43//! # }
44//! # struct CalculatorServiceImpl;
45//! # impl Calculator for CalculatorServiceImpl {
46//! #     fn add(&self, left: u32, right: u32) -> u32 { left + right }
47//! # }
48//! # async fn example() {
49//! // create a MessageChannel
50//! let channel = web_sys::MessageChannel::new().unwrap();
51//! // Create two interfaces from the ports
52//! let (server_interface, client_interface) = futures_util::future::join(
53//!     web_rpc::Interface::new(channel.port1()),
54//!     web_rpc::Interface::new(channel.port2()),
55//! ).await;
56//! // create a server with the first interface
57//! let server = web_rpc::Builder::new(server_interface)
58//!     .with_service::<CalculatorService<_>>(CalculatorServiceImpl)
59//!     .build();
60//! // spawn the server
61//! wasm_bindgen_futures::spawn_local(server);
62//! # }
63//! ```
64//! [`Interface::new`] is async since there is no way to synchronously check whether a channel or
65//! a worker is ready to receive messages. To workaround this, temporary listeners are attached to
66//! determine when a channel is ready for communication. The server returned by the build method is
67//! a future that can be added to the browser's event loop using
68//! [`wasm_bindgen_futures::spawn_local`], however, this will run the server indefinitely. For more
69//! control, consider wrapping the server with [`futures_util::FutureExt::remote_handle`] before
70//! spawning it, which will shutdown the server once the handle has been dropped. Moving onto the
71//! client:
72//! ```rust,no_run
73//! # #[web_rpc::service]
74//! # pub trait Calculator {
75//! #     fn add(&self, left: u32, right: u32) -> u32;
76//! # }
77//! # async fn example(client_interface: web_rpc::Interface) {
78//! // create a client using the second interface
79//! let client = web_rpc::Builder::new(client_interface)
80//!     .with_client::<CalculatorClient>()
81//!     .build();
82//! /* call `add` */
83//! assert_eq!(client.add(41, 1).await, 42);
84//! # }
85//! ```
86//! That is it! Underneath the hood, the client will serialize its arguments using bincode and
87//! transfer the bytes to server. The server will deserialize those arguments and run
88//! `<CalculatorServiceImpl as Calculator>::add` before returning the result to the client. Note
89//! that we are only awaiting the response of the call to `add`, the request itself is sent
90//! synchronously before we await anything.
91//!
92//! ## Advanced examples
93//! Now that we have the basic idea of how define an RPC trait and set up a server and client, let's
94//! dive into some of the more advanced features of this library!
95//!
96//! ### Synchronous and asynchronous RPC methods
97//! Server methods can be asynchronous! That is, you can define the following RPC trait and service
98//! implementation:
99//! ```rust,no_run
100//! # use std::time::Duration;
101//! #[web_rpc::service]
102//! pub trait Sleep {
103//!     async fn sleep(&self, interval: Duration);
104//! }
105//!
106//! struct SleepServiceImpl;
107//! impl Sleep for SleepServiceImpl {
108//!     async fn sleep(&self, interval: Duration) {
109//!         gloo_timers::future::sleep(interval).await;
110//!     }
111//! }
112//! ```
113//! Asynchronous RPC methods are run concurrently on the server and also support cancellation if the
114//! future on the client side is dropped. However, such a future is only returned from a client
115//! method if the RPC returns a value. Otherwise the RPC is considered a notification.
116//!
117//! ### Notifications
118//! Notifications are RPCs that do not return anything. On the client side, the method is completely
119//! synchronous and also returns nothing. This setup is useful if you need to communicate with
120//! another part of your application but cannot yield to the event loop.
121//!
122//! The implication of this, however, is that even if the server method is asynchronous, we are
123//! unable to cancel it from the client side since we do not have a future that can be dropped.
124//!
125//! ### Posting and transferring Javascript types
126//! In the example above, we discussed how the client serializes its arguments before sending them
127//! to the server. This approach is convenient, but how do send web types such as a
128//! `WebAssembly.Module` or an `OffscreenCanvas` that have no serializable representation? Well, we
129//! are in luck since this happens to be one of the key features of this crate. Consider the
130//! following RPC trait:
131//! ```rust
132//! #[web_rpc::service]
133//! pub trait Concat {
134//!     #[post(left, right, return)]
135//!     fn concat_with_space(
136//!         &self,
137//!         left: js_sys::JsString,
138//!         right: js_sys::JsString
139//!     ) -> js_sys::JsString;
140//! }
141//! ```
142//! All we have done is added the `post` attribute to the method and listed the arguments that we
143//! would like to be posted to the other side. Under the hood, the implementation of the client will
144//! then skip these arguments during serialization and just append them after the serialized message
145//! to the array that will be posted. As shown above, this also works for the return type by just
146//! specifying `return` in the post attribute. For web types that need to be transferred, we simply
147//! wrap them in `transfer` as follows:
148//! ```rust
149//! #[web_rpc::service]
150//! pub trait GameEngine {
151//!     #[post(transfer(canvas))]
152//!     fn send_canvas(
153//!         &self,
154//!         canvas: web_sys::OffscreenCanvas,
155//!     );
156//! }
157//! ```
158//! ### Borrowed parameters
159//! RPC methods can accept borrowed types such as `&str` and `&[u8]`, which are deserialized
160//! zero-copy on the server side:
161//! ```rust
162//! #[web_rpc::service]
163//! pub trait Greeter {
164//!     fn greet(&self, name: &str, greeting: &str) -> String;
165//!     fn count_bytes(&self, data: &[u8]) -> usize;
166//! }
167//!
168//! struct GreeterServiceImpl;
169//! impl Greeter for GreeterServiceImpl {
170//!     fn greet(&self, name: &str, greeting: &str) -> String {
171//!         format!("{greeting}, {name}!")
172//!     }
173//!     fn count_bytes(&self, data: &[u8]) -> usize {
174//!         data.len()
175//!     }
176//! }
177//! ```
178//! This avoids unnecessary allocations — the server deserializes directly from the received
179//! message bytes without copying into owned `String` or `Vec<u8>` types. On the client side,
180//! borrowed parameters are serialized inline before the method returns, so standard Rust
181//! lifetime rules apply. Note that only types with serde borrowing support (`&str`, `&[u8]`)
182//! benefit from zero-copy deserialization.
183//!
184//! ### Streaming
185//! Methods can return a stream of items using `impl Stream<Item = T>` as the return type.
186//! The macro detects this and generates the appropriate client and server code. On the client
187//! side, the method returns a [`client::StreamReceiver<T>`] which implements
188//! [`futures_core::Stream`]. On the server side, the return type is preserved as-is:
189//! ```rust
190//! #[web_rpc::service]
191//! pub trait DataSource {
192//!     fn stream_data(&self, count: u32) -> impl futures_core::Stream<Item = u32>;
193//! }
194//!
195//! struct DataSourceImpl;
196//! impl DataSource for DataSourceImpl {
197//!     fn stream_data(&self, count: u32) -> impl futures_core::Stream<Item = u32> {
198//!         let (tx, rx) = futures_channel::mpsc::unbounded();
199//!         for i in 0..count {
200//!             let _ = tx.unbounded_send(i);
201//!         }
202//!         rx
203//!     }
204//! }
205//! ```
206//! Dropping the [`client::StreamReceiver`] sends an abort signal to the server, cancelling the
207//! stream. Alternatively, calling [`close`](client::StreamReceiver::close) stops the server
208//! while still allowing buffered items to be drained. Streaming methods can also be async and
209//! can be combined with the `#[post(return)]` attribute for streaming JavaScript types.
210//!
211//! ### Bi-directional RPC
212//! In the original example, we created a server on the first port of the message channel and a
213//! client on the second port. However, it is possible to define both a client and a server on each
214//! side, enabling bi-directional RPC. This is particularly useful if we want to send and receive
215//! messages from a worker without sending it a seperate channel for the bi-directional
216//! communication. Our original example can be extended as follows:
217//! ```rust,no_run
218//! # #[web_rpc::service]
219//! # pub trait Calculator {
220//! #     fn add(&self, left: u32, right: u32) -> u32;
221//! # }
222//! # struct CalculatorServiceImpl;
223//! # impl Calculator for CalculatorServiceImpl {
224//! #     fn add(&self, left: u32, right: u32) -> u32 { left + right }
225//! # }
226//! # async fn example() {
227//! /* create channel */
228//! let channel = web_sys::MessageChannel::new().unwrap();
229//! let (interface1, interface2) = futures_util::future::join(
230//!     web_rpc::Interface::new(channel.port1()),
231//!     web_rpc::Interface::new(channel.port2()),
232//! ).await;
233//! /* create server1 and client1 */
234//! let (client1, server1) = web_rpc::Builder::new(interface1)
235//!     .with_service::<CalculatorService<_>>(CalculatorServiceImpl)
236//!     .with_client::<CalculatorClient>()
237//!     .build();
238//! /* create server2 and client2 */
239//! let (client2, server2) = web_rpc::Builder::new(interface2)
240//!     .with_service::<CalculatorService<_>>(CalculatorServiceImpl)
241//!     .with_client::<CalculatorClient>()
242//!     .build();
243//! # }
244//! ```
245
246use std::{
247    cell::RefCell,
248    marker::PhantomData,
249    pin::Pin,
250    rc::Rc,
251    task::{Context, Poll},
252};
253
254use futures_channel::mpsc;
255use futures_core::{future::LocalBoxFuture, Future};
256use futures_util::{FutureExt, StreamExt};
257use gloo_events::EventListener;
258use js_sys::{ArrayBuffer, Uint8Array};
259use serde::{de::DeserializeOwned, Deserialize, Serialize};
260use wasm_bindgen::JsCast;
261
262#[doc(hidden)]
263pub use bincode;
264#[doc(hidden)]
265pub use futures_channel;
266#[doc(hidden)]
267pub use futures_core;
268#[doc(hidden)]
269pub use futures_util;
270#[doc(hidden)]
271pub use gloo_events;
272#[doc(hidden)]
273pub use js_sys;
274#[doc(hidden)]
275pub use pin_utils;
276#[doc(hidden)]
277pub use serde;
278#[doc(hidden)]
279pub use wasm_bindgen;
280
281pub use web_rpc_macro::service;
282
283pub mod client;
284pub mod interface;
285pub mod port;
286#[doc(hidden)]
287pub mod service;
288
289pub use interface::Interface;
290
291#[doc(hidden)]
292#[derive(Serialize, Deserialize)]
293pub enum MessageHeader {
294    Request(usize),
295    Abort(usize),
296    Response(usize),
297    StreamItem(usize),
298    StreamEnd(usize),
299}
300
301/// This struct allows one to configure the RPC interface prior to creating it.
302/// To get an instance of this struct, call [`Builder<C, S>::new`] with
303/// an [`Interface`].
304pub struct Builder<C, S> {
305    client: PhantomData<C>,
306    service: S,
307    interface: Interface,
308}
309
310impl Builder<(), ()> {
311    /// Create a new builder from an [`Interface`]
312    pub fn new(interface: Interface) -> Self {
313        Self {
314            interface,
315            client: PhantomData::<()>,
316            service: (),
317        }
318    }
319}
320
321impl<C> Builder<C, ()> {
322    /// Configure the RPC interface with a service that implements methods
323    /// that can be called from the other side of the channel. To use this method,
324    /// you need to specify the type `S` which is the service type generated by the
325    /// attribute macro [`macro@service`]. The implementation parameter is then an
326    /// instance of something that implements the trait to which to applied the
327    /// [`macro@service`] macro. For example, if you have a trait `Calculator` to
328    /// which you have applied [`macro@service`], you would use this method as follows:
329    /// ```rust,no_run
330    /// # #[web_rpc::service]
331    /// # pub trait Calculator {
332    /// #     fn add(&self, left: u32, right: u32) -> u32;
333    /// # }
334    /// # struct CalculatorServiceImpl;
335    /// # impl Calculator for CalculatorServiceImpl {
336    /// #     fn add(&self, left: u32, right: u32) -> u32 { left + right }
337    /// # }
338    /// # fn example(some_interface: web_rpc::Interface) {
339    /// let server = web_rpc::Builder::new(some_interface)
340    ///     .with_service::<CalculatorService<_>>(CalculatorServiceImpl)
341    ///     .build();
342    /// # }
343    /// ```
344    pub fn with_service<S: service::Service>(self, implementation: impl Into<S>) -> Builder<C, S> {
345        let service = implementation.into();
346        let Builder {
347            interface, client, ..
348        } = self;
349        Builder {
350            interface,
351            client,
352            service,
353        }
354    }
355}
356
357impl<S> Builder<(), S> {
358    /// Configure the RPC interface with a client that allows you to execute RPCs on the
359    /// server. The builder will automatically instansiate the client for you, you just
360    /// need to provide the type which is generated via the [`macro@service`] attribute
361    /// macro. For example, if you had a trait `Calculator` to which you applied the
362    /// [`macro@service`] attribute macro, the macro would have generated a `CalculatorClient`
363    /// struct which you can use as the `C` in this function.
364    pub fn with_client<C: client::Client>(self) -> Builder<C, S> {
365        let Builder {
366            interface, service, ..
367        } = self;
368        Builder {
369            interface,
370            client: PhantomData::<C>,
371            service,
372        }
373    }
374}
375
376/// `Server` is the server that is returned from the [`Builder::build`] method given
377/// you configured the RPC interface with a service. Note that `Server` implements future and needs
378/// to be polled in order to execute and respond to inbound RPC requests.
379#[must_use = "Server must be polled in order for RPC requests to be executed"]
380pub struct Server {
381    _listener: Rc<EventListener>,
382    task: LocalBoxFuture<'static, ()>,
383}
384
385impl Future for Server {
386    type Output = ();
387
388    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
389        self.task.poll_unpin(cx)
390    }
391}
392
393impl<C> Builder<C, ()>
394where
395    C: client::Client + From<client::Configuration<C::Response>> + 'static,
396    <C as client::Client>::Response: DeserializeOwned,
397{
398    /// Build function for client-only RPC interfaces.
399    pub fn build(self) -> C {
400        let Builder {
401            interface:
402                Interface {
403                    port,
404                    listener,
405                    mut messages_rx,
406                },
407            ..
408        } = self;
409        let client_callback_map: Rc<RefCell<client::CallbackMap<C::Response>>> = Default::default();
410        let client_callback_map_cloned = client_callback_map.clone();
411        let stream_callback_map: Rc<RefCell<client::StreamCallbackMap<C::Response>>> =
412            Default::default();
413        let stream_callback_map_cloned = stream_callback_map.clone();
414        let dispatcher = async move {
415            while let Some(array) = messages_rx.next().await {
416                let header_bytes =
417                    Uint8Array::new(&array.shift().dyn_into::<ArrayBuffer>().unwrap()).to_vec();
418                let header: MessageHeader = bincode::deserialize(&header_bytes).unwrap();
419                match header {
420                    MessageHeader::Response(seq_id) => {
421                        let payload_bytes =
422                            Uint8Array::new(&array.shift().dyn_into::<ArrayBuffer>().unwrap())
423                                .to_vec();
424                        let response: C::Response = bincode::deserialize(&payload_bytes).unwrap();
425                        if let Some(callback_tx) =
426                            client_callback_map_cloned.borrow_mut().remove(&seq_id)
427                        {
428                            let _ = callback_tx.send((response, array));
429                        }
430                    }
431                    MessageHeader::StreamItem(seq_id) => {
432                        let payload_bytes =
433                            Uint8Array::new(&array.shift().dyn_into::<ArrayBuffer>().unwrap())
434                                .to_vec();
435                        let response: C::Response = bincode::deserialize(&payload_bytes).unwrap();
436                        if let Some(tx) = stream_callback_map_cloned.borrow().get(&seq_id) {
437                            let _ = tx.unbounded_send((response, array));
438                        }
439                    }
440                    MessageHeader::StreamEnd(seq_id) => {
441                        stream_callback_map_cloned.borrow_mut().remove(&seq_id);
442                    }
443                    _ => panic!("client received a server message"),
444                }
445            }
446        }
447        .boxed_local()
448        .shared();
449        let port_cloned = port.clone();
450        let abort_sender = move |seq_id: usize| {
451            let header = MessageHeader::Abort(seq_id);
452            let header_bytes = bincode::serialize(&header).unwrap();
453            let buffer = js_sys::Uint8Array::from(&header_bytes[..]).buffer();
454            let post_args = js_sys::Array::of1(&buffer);
455            let transfer_args = js_sys::Array::of1(&buffer);
456            port_cloned
457                .post_message(&post_args, &transfer_args)
458                .unwrap();
459        };
460        C::from((
461            client_callback_map,
462            stream_callback_map,
463            port,
464            Rc::new(listener),
465            dispatcher,
466            Rc::new(abort_sender),
467        ))
468    }
469}
470
471impl<S> Builder<(), S>
472where
473    S: service::Service + 'static,
474    <S as service::Service>::Response: Serialize,
475{
476    /// Build function for server-only RPC interfaces.
477    pub fn build(self) -> Server {
478        let Builder {
479            service,
480            interface:
481                Interface {
482                    port,
483                    listener,
484                    mut messages_rx,
485                },
486            ..
487        } = self;
488        let (server_requests_tx, server_requests_rx) = mpsc::unbounded();
489        let (abort_requests_tx, abort_requests_rx) = mpsc::unbounded();
490        let dispatcher = async move {
491            while let Some(array) = messages_rx.next().await {
492                let header_bytes =
493                    Uint8Array::new(&array.shift().dyn_into::<ArrayBuffer>().unwrap()).to_vec();
494                let header: MessageHeader = bincode::deserialize(&header_bytes).unwrap();
495                match header {
496                    MessageHeader::Request(seq_id) => {
497                        let payload =
498                            Uint8Array::new(&array.shift().dyn_into::<ArrayBuffer>().unwrap())
499                                .to_vec();
500                        server_requests_tx
501                            .unbounded_send((seq_id, payload, array))
502                            .unwrap();
503                    }
504                    MessageHeader::Abort(seq_id) => {
505                        abort_requests_tx.unbounded_send(seq_id).unwrap();
506                    }
507                    _ => panic!("server received a client message"),
508                }
509            }
510        }
511        .boxed_local()
512        .shared();
513        Server {
514            _listener: Rc::new(listener),
515            task: service::task::<S>(
516                service,
517                port,
518                dispatcher,
519                server_requests_rx,
520                abort_requests_rx,
521            )
522            .boxed_local(),
523        }
524    }
525}
526
527impl<C, S> Builder<C, S>
528where
529    C: client::Client + From<client::Configuration<C::Response>> + 'static,
530    S: service::Service + 'static,
531    <S as service::Service>::Response: Serialize,
532    <C as client::Client>::Response: DeserializeOwned,
533{
534    /// Build function for client-server RPC interfaces.
535    pub fn build(self) -> (C, Server) {
536        let Builder {
537            service: server,
538            interface:
539                Interface {
540                    port,
541                    listener,
542                    mut messages_rx,
543                },
544            ..
545        } = self;
546        let client_callback_map: Rc<RefCell<client::CallbackMap<C::Response>>> = Default::default();
547        let stream_callback_map: Rc<RefCell<client::StreamCallbackMap<C::Response>>> =
548            Default::default();
549        let (server_requests_tx, server_requests_rx) = mpsc::unbounded();
550        let (abort_requests_tx, abort_requests_rx) = mpsc::unbounded();
551        let client_callback_map_cloned = client_callback_map.clone();
552        let stream_callback_map_cloned = stream_callback_map.clone();
553        let dispatcher = async move {
554            while let Some(array) = messages_rx.next().await {
555                let header_bytes =
556                    Uint8Array::new(&array.shift().dyn_into::<ArrayBuffer>().unwrap()).to_vec();
557                let header: MessageHeader = bincode::deserialize(&header_bytes).unwrap();
558                match header {
559                    MessageHeader::Response(seq_id) => {
560                        let payload_bytes =
561                            Uint8Array::new(&array.shift().dyn_into::<ArrayBuffer>().unwrap())
562                                .to_vec();
563                        let response: C::Response = bincode::deserialize(&payload_bytes).unwrap();
564                        if let Some(callback_tx) =
565                            client_callback_map_cloned.borrow_mut().remove(&seq_id)
566                        {
567                            let _ = callback_tx.send((response, array));
568                        }
569                    }
570                    MessageHeader::StreamItem(seq_id) => {
571                        let payload_bytes =
572                            Uint8Array::new(&array.shift().dyn_into::<ArrayBuffer>().unwrap())
573                                .to_vec();
574                        let response: C::Response = bincode::deserialize(&payload_bytes).unwrap();
575                        if let Some(tx) = stream_callback_map_cloned.borrow().get(&seq_id) {
576                            let _ = tx.unbounded_send((response, array));
577                        }
578                    }
579                    MessageHeader::StreamEnd(seq_id) => {
580                        stream_callback_map_cloned.borrow_mut().remove(&seq_id);
581                    }
582                    MessageHeader::Request(seq_id) => {
583                        let payload =
584                            Uint8Array::new(&array.shift().dyn_into::<ArrayBuffer>().unwrap())
585                                .to_vec();
586                        server_requests_tx
587                            .unbounded_send((seq_id, payload, array))
588                            .unwrap();
589                    }
590                    MessageHeader::Abort(seq_id) => {
591                        abort_requests_tx.unbounded_send(seq_id).unwrap();
592                    }
593                }
594            }
595        }
596        .boxed_local()
597        .shared();
598        let port_cloned = port.clone();
599        let abort_sender = move |seq_id: usize| {
600            let header = MessageHeader::Abort(seq_id);
601            let header_bytes = bincode::serialize(&header).unwrap();
602            let buffer = js_sys::Uint8Array::from(&header_bytes[..]).buffer();
603            let post_args = js_sys::Array::of1(&buffer);
604            let transfer_args = js_sys::Array::of1(&buffer);
605            port_cloned
606                .post_message(&post_args, &transfer_args)
607                .unwrap();
608        };
609        let listener = Rc::new(listener);
610        let client = C::from((
611            client_callback_map,
612            stream_callback_map,
613            port.clone(),
614            listener.clone(),
615            dispatcher.clone(),
616            Rc::new(abort_sender),
617        ));
618        let server = Server {
619            _listener: listener,
620            task: service::task::<S>(
621                server,
622                port,
623                dispatcher,
624                server_requests_rx,
625                abort_requests_rx,
626            )
627            .boxed_local(),
628        };
629        (client, server)
630    }
631}