web_rpc/lib.rs
1//! The `web-rpc` create is a library for performing RPCs (remote proceedure calls) between
2//! browsing contexts, web workers, and channels. It allows you to define an RPC using a trait
3//! similar to Google's [tarpc](https://github.com/google/tarpc) and will transparently
4//! handle the serialization and deserialization of the arguments. Moreover, it can post
5//! anything that implements [`AsRef<JsValue>`](https://docs.rs/wasm-bindgen/latest/wasm_bindgen/struct.JsValue.html) and also supports transferring ownership.
6//!
7//! ## Quick start
8//! To get started define a trait for your RPC service as follows. Annnotate this trait with the
9//! `service` procedural macro that is exported by this crate:
10//! ```rust
11//! #[web_rpc::service]
12//! pub trait Calculator {
13//! fn add(&self, left: u32, right: u32) -> u32;
14//! }
15//! ```
16//! This macro will generate the structs `CalculatorClient`, `CalculatorService`, and a new trait
17//! `Calculator` that you can use to implement the service as follows:
18//! ```rust
19//! # #[web_rpc::service]
20//! # pub trait Calculator {
21//! # fn add(&self, left: u32, right: u32) -> u32;
22//! # }
23//! struct CalculatorServiceImpl;
24//!
25//! impl Calculator for CalculatorServiceImpl {
26//! fn add(&self, left: u32, right: u32) -> u32 {
27//! left + right
28//! }
29//! }
30//! ```
31//! Note that the `&self` receiver is required in the trait definition. Although not
32//! used in this example, this is useful when we want the RPC to modify some state (via interior
33//! mutability). Now that we have defined our RPC, let's create a client and server for it! In this
34//! example, we will use [`MessageChannel`](https://docs.rs/web-sys/latest/web_sys/struct.MessageChannel.html)
35//! since it is easy to construct and test, however, a more common case would be to construct the
36//! channel from a [`Worker`](https://docs.rs/web-sys/latest/web_sys/struct.Worker.html) or a
37//! [`DedicatedWorkerGlobalScope`](https://docs.rs/web-sys/latest/web_sys/struct.DedicatedWorkerGlobalScope.html).
38//! Let's start by defining the server:
39//! ```rust,no_run
40//! # #[web_rpc::service]
41//! # pub trait Calculator {
42//! # fn add(&self, left: u32, right: u32) -> u32;
43//! # }
44//! # struct CalculatorServiceImpl;
45//! # impl Calculator for CalculatorServiceImpl {
46//! # fn add(&self, left: u32, right: u32) -> u32 { left + right }
47//! # }
48//! # async fn example() {
49//! // create a MessageChannel
50//! let channel = web_sys::MessageChannel::new().unwrap();
51//! // Create two interfaces from the ports
52//! let (server_interface, client_interface) = futures_util::future::join(
53//! web_rpc::Interface::new(channel.port1()),
54//! web_rpc::Interface::new(channel.port2()),
55//! ).await;
56//! // create a server with the first interface
57//! let server = web_rpc::Builder::new(server_interface)
58//! .with_service::<CalculatorService<_>>(CalculatorServiceImpl)
59//! .build();
60//! // spawn the server
61//! wasm_bindgen_futures::spawn_local(server);
62//! # }
63//! ```
64//! [`Interface::new`] is async since there is no way to synchronously check whether a channel or
65//! a worker is ready to receive messages. To workaround this, temporary listeners are attached to
66//! determine when a channel is ready for communication. The server returned by the build method is
67//! a future that can be added to the browser's event loop using
68//! [`wasm_bindgen_futures::spawn_local`], however, this will run the server indefinitely. For more
69//! control, consider wrapping the server with [`futures_util::FutureExt::remote_handle`] before
70//! spawning it, which will shutdown the server once the handle has been dropped. Moving onto the
71//! client:
72//! ```rust,no_run
73//! # #[web_rpc::service]
74//! # pub trait Calculator {
75//! # fn add(&self, left: u32, right: u32) -> u32;
76//! # }
77//! # async fn example(client_interface: web_rpc::Interface) {
78//! // create a client using the second interface
79//! let client = web_rpc::Builder::new(client_interface)
80//! .with_client::<CalculatorClient>()
81//! .build();
82//! /* call `add` */
83//! assert_eq!(client.add(41, 1).await, 42);
84//! # }
85//! ```
86//! That is it! Underneath the hood, the client will serialize its arguments using bincode and
87//! transfer the bytes to server. The server will deserialize those arguments and run
88//! `<CalculatorServiceImpl as Calculator>::add` before returning the result to the client. Note
89//! that we are only awaiting the response of the call to `add`, the request itself is sent
90//! synchronously before we await anything.
91//!
92//! ## Advanced examples
93//! Now that we have the basic idea of how define an RPC trait and set up a server and client, let's
94//! dive into some of the more advanced features of this library!
95//!
96//! ### Synchronous and asynchronous RPC methods
97//! Server methods can be asynchronous! That is, you can define the following RPC trait and service
98//! implementation:
99//! ```rust,no_run
100//! # use std::time::Duration;
101//! #[web_rpc::service]
102//! pub trait Sleep {
103//! async fn sleep(&self, interval: Duration);
104//! }
105//!
106//! struct SleepServiceImpl;
107//! impl Sleep for SleepServiceImpl {
108//! async fn sleep(&self, interval: Duration) {
109//! gloo_timers::future::sleep(interval).await;
110//! }
111//! }
112//! ```
113//! Asynchronous RPC methods are run concurrently on the server and also support cancellation if the
114//! future on the client side is dropped. However, such a future is only returned from a client
115//! method if the RPC returns a value. Otherwise the RPC is considered a notification.
116//!
117//! ### Notifications
118//! Notifications are RPCs that do not return anything. On the client side, the method is completely
119//! synchronous and also returns nothing. This setup is useful if you need to communicate with
120//! another part of your application but cannot yield to the event loop.
121//!
122//! The implication of this, however, is that even if the server method is asynchronous, we are
123//! unable to cancel it from the client side since we do not have a future that can be dropped.
124//!
125//! ### Posting and transferring Javascript types
126//! In the example above, we discussed how the client serializes its arguments before sending them
127//! to the server. This approach is convenient, but how do send web types such as a
128//! `WebAssembly.Module` or an `OffscreenCanvas` that have no serializable representation? Well, we
129//! are in luck since this happens to be one of the key features of this crate. Consider the
130//! following RPC trait:
131//! ```rust
132//! #[web_rpc::service]
133//! pub trait Concat {
134//! #[post(left, right, return)]
135//! fn concat_with_space(
136//! &self,
137//! left: js_sys::JsString,
138//! right: js_sys::JsString
139//! ) -> js_sys::JsString;
140//! }
141//! ```
142//! All we have done is added the `post` attribute to the method and listed the arguments that we
143//! would like to be posted to the other side. Under the hood, the implementation of the client will
144//! then skip these arguments during serialization and just append them after the serialized message
145//! to the array that will be posted. As shown above, this also works for the return type by just
146//! specifying `return` in the post attribute. For web types that need to be transferred, we simply
147//! wrap them in `transfer` as follows:
148//! ```rust
149//! #[web_rpc::service]
150//! pub trait GameEngine {
151//! #[post(transfer(canvas))]
152//! fn send_canvas(
153//! &self,
154//! canvas: web_sys::OffscreenCanvas,
155//! );
156//! }
157//! ```
158//! ### Borrowed parameters
159//! RPC methods can accept borrowed types such as `&str` and `&[u8]`, which are deserialized
160//! zero-copy on the server side:
161//! ```rust
162//! #[web_rpc::service]
163//! pub trait Greeter {
164//! fn greet(&self, name: &str, greeting: &str) -> String;
165//! fn count_bytes(&self, data: &[u8]) -> usize;
166//! }
167//!
168//! struct GreeterServiceImpl;
169//! impl Greeter for GreeterServiceImpl {
170//! fn greet(&self, name: &str, greeting: &str) -> String {
171//! format!("{greeting}, {name}!")
172//! }
173//! fn count_bytes(&self, data: &[u8]) -> usize {
174//! data.len()
175//! }
176//! }
177//! ```
178//! This avoids unnecessary allocations — the server deserializes directly from the received
179//! message bytes without copying into owned `String` or `Vec<u8>` types. On the client side,
180//! borrowed parameters are serialized inline before the method returns, so standard Rust
181//! lifetime rules apply. Note that only types with serde borrowing support (`&str`, `&[u8]`)
182//! benefit from zero-copy deserialization.
183//!
184//! ### Streaming
185//! Methods can return a stream of items using `impl Stream<Item = T>` as the return type.
186//! The macro detects this and generates the appropriate client and server code. On the client
187//! side, the method returns a [`client::StreamReceiver<T>`] which implements
188//! [`futures_core::Stream`]. On the server side, the return type is preserved as-is:
189//! ```rust
190//! #[web_rpc::service]
191//! pub trait DataSource {
192//! fn stream_data(&self, count: u32) -> impl futures_core::Stream<Item = u32>;
193//! }
194//!
195//! struct DataSourceImpl;
196//! impl DataSource for DataSourceImpl {
197//! fn stream_data(&self, count: u32) -> impl futures_core::Stream<Item = u32> {
198//! let (tx, rx) = futures_channel::mpsc::unbounded();
199//! for i in 0..count {
200//! let _ = tx.unbounded_send(i);
201//! }
202//! rx
203//! }
204//! }
205//! ```
206//! Dropping the [`client::StreamReceiver`] sends an abort signal to the server, cancelling the
207//! stream. Alternatively, calling [`close`](client::StreamReceiver::close) stops the server
208//! while still allowing buffered items to be drained. Streaming methods can also be async and
209//! can be combined with the `#[post(return)]` attribute for streaming JavaScript types.
210//!
211//! ### Bi-directional RPC
212//! In the original example, we created a server on the first port of the message channel and a
213//! client on the second port. However, it is possible to define both a client and a server on each
214//! side, enabling bi-directional RPC. This is particularly useful if we want to send and receive
215//! messages from a worker without sending it a seperate channel for the bi-directional
216//! communication. Our original example can be extended as follows:
217//! ```rust,no_run
218//! # #[web_rpc::service]
219//! # pub trait Calculator {
220//! # fn add(&self, left: u32, right: u32) -> u32;
221//! # }
222//! # struct CalculatorServiceImpl;
223//! # impl Calculator for CalculatorServiceImpl {
224//! # fn add(&self, left: u32, right: u32) -> u32 { left + right }
225//! # }
226//! # async fn example() {
227//! /* create channel */
228//! let channel = web_sys::MessageChannel::new().unwrap();
229//! let (interface1, interface2) = futures_util::future::join(
230//! web_rpc::Interface::new(channel.port1()),
231//! web_rpc::Interface::new(channel.port2()),
232//! ).await;
233//! /* create server1 and client1 */
234//! let (client1, server1) = web_rpc::Builder::new(interface1)
235//! .with_service::<CalculatorService<_>>(CalculatorServiceImpl)
236//! .with_client::<CalculatorClient>()
237//! .build();
238//! /* create server2 and client2 */
239//! let (client2, server2) = web_rpc::Builder::new(interface2)
240//! .with_service::<CalculatorService<_>>(CalculatorServiceImpl)
241//! .with_client::<CalculatorClient>()
242//! .build();
243//! # }
244//! ```
245
246use std::{
247 cell::RefCell,
248 marker::PhantomData,
249 pin::Pin,
250 rc::Rc,
251 task::{Context, Poll},
252};
253
254use futures_channel::mpsc;
255use futures_core::{future::LocalBoxFuture, Future};
256use futures_util::{FutureExt, StreamExt};
257use gloo_events::EventListener;
258use js_sys::{ArrayBuffer, Uint8Array};
259use serde::{de::DeserializeOwned, Deserialize, Serialize};
260use wasm_bindgen::JsCast;
261
262#[doc(hidden)]
263pub use bincode;
264#[doc(hidden)]
265pub use futures_channel;
266#[doc(hidden)]
267pub use futures_core;
268#[doc(hidden)]
269pub use futures_util;
270#[doc(hidden)]
271pub use gloo_events;
272#[doc(hidden)]
273pub use js_sys;
274#[doc(hidden)]
275pub use pin_utils;
276#[doc(hidden)]
277pub use serde;
278#[doc(hidden)]
279pub use wasm_bindgen;
280
281pub use web_rpc_macro::service;
282
283pub mod client;
284pub mod interface;
285pub mod port;
286#[doc(hidden)]
287pub mod service;
288
289pub use interface::Interface;
290
291#[doc(hidden)]
292#[derive(Serialize, Deserialize)]
293pub enum MessageHeader {
294 Request(usize),
295 Abort(usize),
296 Response(usize),
297 StreamItem(usize),
298 StreamEnd(usize),
299}
300
301/// This struct allows one to configure the RPC interface prior to creating it.
302/// To get an instance of this struct, call [`Builder<C, S>::new`] with
303/// an [`Interface`].
304pub struct Builder<C, S> {
305 client: PhantomData<C>,
306 service: S,
307 interface: Interface,
308}
309
310impl Builder<(), ()> {
311 /// Create a new builder from an [`Interface`]
312 pub fn new(interface: Interface) -> Self {
313 Self {
314 interface,
315 client: PhantomData::<()>,
316 service: (),
317 }
318 }
319}
320
321impl<C> Builder<C, ()> {
322 /// Configure the RPC interface with a service that implements methods
323 /// that can be called from the other side of the channel. To use this method,
324 /// you need to specify the type `S` which is the service type generated by the
325 /// attribute macro [`macro@service`]. The implementation parameter is then an
326 /// instance of something that implements the trait to which to applied the
327 /// [`macro@service`] macro. For example, if you have a trait `Calculator` to
328 /// which you have applied [`macro@service`], you would use this method as follows:
329 /// ```rust,no_run
330 /// # #[web_rpc::service]
331 /// # pub trait Calculator {
332 /// # fn add(&self, left: u32, right: u32) -> u32;
333 /// # }
334 /// # struct CalculatorServiceImpl;
335 /// # impl Calculator for CalculatorServiceImpl {
336 /// # fn add(&self, left: u32, right: u32) -> u32 { left + right }
337 /// # }
338 /// # fn example(some_interface: web_rpc::Interface) {
339 /// let server = web_rpc::Builder::new(some_interface)
340 /// .with_service::<CalculatorService<_>>(CalculatorServiceImpl)
341 /// .build();
342 /// # }
343 /// ```
344 pub fn with_service<S: service::Service>(self, implementation: impl Into<S>) -> Builder<C, S> {
345 let service = implementation.into();
346 let Builder {
347 interface, client, ..
348 } = self;
349 Builder {
350 interface,
351 client,
352 service,
353 }
354 }
355}
356
357impl<S> Builder<(), S> {
358 /// Configure the RPC interface with a client that allows you to execute RPCs on the
359 /// server. The builder will automatically instansiate the client for you, you just
360 /// need to provide the type which is generated via the [`macro@service`] attribute
361 /// macro. For example, if you had a trait `Calculator` to which you applied the
362 /// [`macro@service`] attribute macro, the macro would have generated a `CalculatorClient`
363 /// struct which you can use as the `C` in this function.
364 pub fn with_client<C: client::Client>(self) -> Builder<C, S> {
365 let Builder {
366 interface, service, ..
367 } = self;
368 Builder {
369 interface,
370 client: PhantomData::<C>,
371 service,
372 }
373 }
374}
375
376/// `Server` is the server that is returned from the [`Builder::build`] method given
377/// you configured the RPC interface with a service. Note that `Server` implements future and needs
378/// to be polled in order to execute and respond to inbound RPC requests.
379#[must_use = "Server must be polled in order for RPC requests to be executed"]
380pub struct Server {
381 _listener: Rc<EventListener>,
382 task: LocalBoxFuture<'static, ()>,
383}
384
385impl Future for Server {
386 type Output = ();
387
388 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
389 self.task.poll_unpin(cx)
390 }
391}
392
393impl<C> Builder<C, ()>
394where
395 C: client::Client + From<client::Configuration<C::Response>> + 'static,
396 <C as client::Client>::Response: DeserializeOwned,
397{
398 /// Build function for client-only RPC interfaces.
399 pub fn build(self) -> C {
400 let Builder {
401 interface:
402 Interface {
403 port,
404 listener,
405 mut messages_rx,
406 },
407 ..
408 } = self;
409 let client_callback_map: Rc<RefCell<client::CallbackMap<C::Response>>> = Default::default();
410 let client_callback_map_cloned = client_callback_map.clone();
411 let stream_callback_map: Rc<RefCell<client::StreamCallbackMap<C::Response>>> =
412 Default::default();
413 let stream_callback_map_cloned = stream_callback_map.clone();
414 let dispatcher = async move {
415 while let Some(array) = messages_rx.next().await {
416 let header_bytes =
417 Uint8Array::new(&array.shift().dyn_into::<ArrayBuffer>().unwrap()).to_vec();
418 let header: MessageHeader = bincode::deserialize(&header_bytes).unwrap();
419 match header {
420 MessageHeader::Response(seq_id) => {
421 let payload_bytes =
422 Uint8Array::new(&array.shift().dyn_into::<ArrayBuffer>().unwrap())
423 .to_vec();
424 let response: C::Response = bincode::deserialize(&payload_bytes).unwrap();
425 if let Some(callback_tx) =
426 client_callback_map_cloned.borrow_mut().remove(&seq_id)
427 {
428 let _ = callback_tx.send((response, array));
429 }
430 }
431 MessageHeader::StreamItem(seq_id) => {
432 let payload_bytes =
433 Uint8Array::new(&array.shift().dyn_into::<ArrayBuffer>().unwrap())
434 .to_vec();
435 let response: C::Response = bincode::deserialize(&payload_bytes).unwrap();
436 if let Some(tx) = stream_callback_map_cloned.borrow().get(&seq_id) {
437 let _ = tx.unbounded_send((response, array));
438 }
439 }
440 MessageHeader::StreamEnd(seq_id) => {
441 stream_callback_map_cloned.borrow_mut().remove(&seq_id);
442 }
443 _ => panic!("client received a server message"),
444 }
445 }
446 }
447 .boxed_local()
448 .shared();
449 let port_cloned = port.clone();
450 let abort_sender = move |seq_id: usize| {
451 let header = MessageHeader::Abort(seq_id);
452 let header_bytes = bincode::serialize(&header).unwrap();
453 let buffer = js_sys::Uint8Array::from(&header_bytes[..]).buffer();
454 let post_args = js_sys::Array::of1(&buffer);
455 let transfer_args = js_sys::Array::of1(&buffer);
456 port_cloned
457 .post_message(&post_args, &transfer_args)
458 .unwrap();
459 };
460 C::from((
461 client_callback_map,
462 stream_callback_map,
463 port,
464 Rc::new(listener),
465 dispatcher,
466 Rc::new(abort_sender),
467 ))
468 }
469}
470
471impl<S> Builder<(), S>
472where
473 S: service::Service + 'static,
474 <S as service::Service>::Response: Serialize,
475{
476 /// Build function for server-only RPC interfaces.
477 pub fn build(self) -> Server {
478 let Builder {
479 service,
480 interface:
481 Interface {
482 port,
483 listener,
484 mut messages_rx,
485 },
486 ..
487 } = self;
488 let (server_requests_tx, server_requests_rx) = mpsc::unbounded();
489 let (abort_requests_tx, abort_requests_rx) = mpsc::unbounded();
490 let dispatcher = async move {
491 while let Some(array) = messages_rx.next().await {
492 let header_bytes =
493 Uint8Array::new(&array.shift().dyn_into::<ArrayBuffer>().unwrap()).to_vec();
494 let header: MessageHeader = bincode::deserialize(&header_bytes).unwrap();
495 match header {
496 MessageHeader::Request(seq_id) => {
497 let payload =
498 Uint8Array::new(&array.shift().dyn_into::<ArrayBuffer>().unwrap())
499 .to_vec();
500 server_requests_tx
501 .unbounded_send((seq_id, payload, array))
502 .unwrap();
503 }
504 MessageHeader::Abort(seq_id) => {
505 abort_requests_tx.unbounded_send(seq_id).unwrap();
506 }
507 _ => panic!("server received a client message"),
508 }
509 }
510 }
511 .boxed_local()
512 .shared();
513 Server {
514 _listener: Rc::new(listener),
515 task: service::task::<S>(
516 service,
517 port,
518 dispatcher,
519 server_requests_rx,
520 abort_requests_rx,
521 )
522 .boxed_local(),
523 }
524 }
525}
526
527impl<C, S> Builder<C, S>
528where
529 C: client::Client + From<client::Configuration<C::Response>> + 'static,
530 S: service::Service + 'static,
531 <S as service::Service>::Response: Serialize,
532 <C as client::Client>::Response: DeserializeOwned,
533{
534 /// Build function for client-server RPC interfaces.
535 pub fn build(self) -> (C, Server) {
536 let Builder {
537 service: server,
538 interface:
539 Interface {
540 port,
541 listener,
542 mut messages_rx,
543 },
544 ..
545 } = self;
546 let client_callback_map: Rc<RefCell<client::CallbackMap<C::Response>>> = Default::default();
547 let stream_callback_map: Rc<RefCell<client::StreamCallbackMap<C::Response>>> =
548 Default::default();
549 let (server_requests_tx, server_requests_rx) = mpsc::unbounded();
550 let (abort_requests_tx, abort_requests_rx) = mpsc::unbounded();
551 let client_callback_map_cloned = client_callback_map.clone();
552 let stream_callback_map_cloned = stream_callback_map.clone();
553 let dispatcher = async move {
554 while let Some(array) = messages_rx.next().await {
555 let header_bytes =
556 Uint8Array::new(&array.shift().dyn_into::<ArrayBuffer>().unwrap()).to_vec();
557 let header: MessageHeader = bincode::deserialize(&header_bytes).unwrap();
558 match header {
559 MessageHeader::Response(seq_id) => {
560 let payload_bytes =
561 Uint8Array::new(&array.shift().dyn_into::<ArrayBuffer>().unwrap())
562 .to_vec();
563 let response: C::Response = bincode::deserialize(&payload_bytes).unwrap();
564 if let Some(callback_tx) =
565 client_callback_map_cloned.borrow_mut().remove(&seq_id)
566 {
567 let _ = callback_tx.send((response, array));
568 }
569 }
570 MessageHeader::StreamItem(seq_id) => {
571 let payload_bytes =
572 Uint8Array::new(&array.shift().dyn_into::<ArrayBuffer>().unwrap())
573 .to_vec();
574 let response: C::Response = bincode::deserialize(&payload_bytes).unwrap();
575 if let Some(tx) = stream_callback_map_cloned.borrow().get(&seq_id) {
576 let _ = tx.unbounded_send((response, array));
577 }
578 }
579 MessageHeader::StreamEnd(seq_id) => {
580 stream_callback_map_cloned.borrow_mut().remove(&seq_id);
581 }
582 MessageHeader::Request(seq_id) => {
583 let payload =
584 Uint8Array::new(&array.shift().dyn_into::<ArrayBuffer>().unwrap())
585 .to_vec();
586 server_requests_tx
587 .unbounded_send((seq_id, payload, array))
588 .unwrap();
589 }
590 MessageHeader::Abort(seq_id) => {
591 abort_requests_tx.unbounded_send(seq_id).unwrap();
592 }
593 }
594 }
595 }
596 .boxed_local()
597 .shared();
598 let port_cloned = port.clone();
599 let abort_sender = move |seq_id: usize| {
600 let header = MessageHeader::Abort(seq_id);
601 let header_bytes = bincode::serialize(&header).unwrap();
602 let buffer = js_sys::Uint8Array::from(&header_bytes[..]).buffer();
603 let post_args = js_sys::Array::of1(&buffer);
604 let transfer_args = js_sys::Array::of1(&buffer);
605 port_cloned
606 .post_message(&post_args, &transfer_args)
607 .unwrap();
608 };
609 let listener = Rc::new(listener);
610 let client = C::from((
611 client_callback_map,
612 stream_callback_map,
613 port.clone(),
614 listener.clone(),
615 dispatcher.clone(),
616 Rc::new(abort_sender),
617 ));
618 let server = Server {
619 _listener: listener,
620 task: service::task::<S>(
621 server,
622 port,
623 dispatcher,
624 server_requests_rx,
625 abort_requests_rx,
626 )
627 .boxed_local(),
628 };
629 (client, server)
630 }
631}