web_rpc/lib.rs
1//! The `web-rpc` create is a library for performing RPCs (remote proceedure calls) between
2//! browsing contexts, web workers, and channels. It allows you to define an RPC using a trait
3//! similar to Google's [tarpc](https://github.com/google/tarpc) and will transparently
4//! handle the serialization and deserialization of the arguments. Moreover, it can post
5//! anything that implements [`AsRef<JsValue>`](https://docs.rs/wasm-bindgen/latest/wasm_bindgen/struct.JsValue.html) and also supports transferring ownership.
6//!
7//! ## Quick start
8//! To get started define a trait for your RPC service as follows. Annnotate this trait with the
9//! `service` procedural macro that is exported by this crate:
10//! ```rust
11//! #[web_rpc::service]
12//! pub trait Calculator {
13//! fn add(left: u32, right: u32) -> u32;
14//! }
15//! ```
16//! This macro will generate the structs `CalculatorClient`, `CalculatorService`, and a new trait
17//! `Calculator` that you can use to implement the service as follows:
18//! ```rust
19//! struct CalculatorServiceImpl;
20//!
21//! impl Calculator for CalculatorServiceImpl {
22//! fn add(&self, left: u32, right: u32) -> u32 {
23//! left + right
24//! }
25//! }
26//! ```
27//! Note that the version of the trait emitted from the macro adds a `&self` receiver. Although not
28//! used in this example, this is useful when we want the RPC to modify some state (via interior
29//! mutability). Now that we have defined our RPC, let's create a client and server for it! In this
30//! example, we will use [`MessageChannel`](https://docs.rs/web-sys/latest/web_sys/struct.MessageChannel.html)
31//! since it is easy to construct and test, however, a more common case would be to construct the
32//! channel from a [`Worker`](https://docs.rs/web-sys/latest/web_sys/struct.Worker.html) or a
33//! [`DedicatedWorkerGlobalScope`](https://docs.rs/web-sys/latest/web_sys/struct.DedicatedWorkerGlobalScope.html).
34//! Let's start by defining the server:
35//! ```rust
36//! // create a MessageChannel
37//! let channel = web_sys::MessageChannel::new();
38//! // Create two interfaces from the ports
39//! let (server_interface, client_interface) = futures_util::future::join(
40//! web_rpc::Interface::new(channel.port1()),
41//! web_rpc::Interface::new(channel.port2()),
42//! ).await;
43//! // create a server with the first interface
44//! let server = web_rpc::Builder::new(server_interface)
45//! .with_service::<CalculatorService<_>>(CalculatorServiceImpl)
46//! .build();
47//! // spawn the server
48//! wasm_bindgen_futures::spawn_local(server);
49//! ```
50//! [`Interface::new`] is async since there is no way to synchronously check whether a channel or
51//! a worker is ready to receive messages. To workaround this, temporary listeners are attached to
52//! determine when a channel is ready for communication. The server returned by the build method is
53//! a future that can be added to the browser's event loop using
54//! [`wasm_bindgen_futures::spawn_local`], however, this will run the server indefinitely. For more
55//! control, consider wrapping the server with [`futures_util::FutureExt::remote_handle`] before
56//! spawning it, which will shutdown the server once the handle has been dropped. Moving onto the
57//! client:
58//! ```rust
59//! // create a client using the second interface
60//! let client = web_rpc::Builder::new(client_interface)
61//! .with_client::<CalculatorClient>()
62//! .build();
63//! /* call `add` */
64//! assert_eq!(client.add(41, 1).await, 42);
65//! ```
66//! That is it! Underneath the hood, the client will serialize its arguments using bincode and
67//! transfer the bytes to server. The server will deserialize those arguments and run
68//! `<CalculatorServiceImpl as Calculator>::add` before returning the result to the client. Note
69//! that we are only awaiting the response of the call to `add`, the request itself is sent
70//! synchronously before we await anything.
71//!
72//! ## Advanced examples
73//! Now that we have the basic idea of how define an RPC trait and set up a server and client, let's
74//! dive into some of the more advanced features of this library!
75//!
76//! ### Synchronous and asynchronous RPC methods
77//! Server methods can be asynchronous! That is, you can define the following RPC trait and service
78//! implementation:
79//! ```rust
80//! #[web_rpc::service]
81//! pub trait Sleep {
82//! async fn sleep(interval: Duration);
83//! }
84//!
85//! struct SleepServiceImpl;
86//! impl Sleep for SleepServiceImpl {
87//! async fn sleep(&self, interval: Duration) -> bool {
88//! gloo_timers::future::sleep(interval).await;
89//! // sleep completed (was not cancelled)
90//! true
91//! }
92//! }
93//! ```
94//! Asynchronous RPC methods are run concurrently on the server and also support cancellation if the
95//! future on the client side is dropped. However, such a future is only returned from a client
96//! method if the RPC returns a value. Otherwise the RPC is considered a notification.
97//!
98//! ### Notifications
99//! Notifications are RPCs that do not return anything. On the client side, the method is completely
100//! synchronous and also returns nothing. This setup is useful if you need to communicate with
101//! another part of your application but cannot yield to the event loop.
102//!
103//! The implication of this, however, is that even if the server method is asynchronous, we are
104//! unable to cancel it from the client side since we do not have a future that can be dropped.
105//!
106//! ### Posting and transferring Javascript types
107//! In the example above, we discussed how the client serializes its arguments before sending them
108//! to the server. This approach is convenient, but how do send web types such as a
109//! `WebAssembly.Module` or an `OffscreenCanvas` that have no serializable representation? Well, we
110//! are in luck since this happens to be one of the key features of this crate. Consider the
111//! following RPC trait:
112//! ```rust
113//! #[web_rpc::service]
114//! pub trait Concat {
115//! #[post(left, right, return)]
116//! fn concat_with_space(
117//! left: js_sys::JsString,
118//! right: js_sys::JsString
119//! ) -> js_sys::JsString;
120//! }
121//! ```
122//! All we have done is added the `post` attribute to the method and listed the arguments that we
123//! would like to be posted to the other side. Under the hood, the implementation of the client will
124//! then skip these arguments during serialization and just append them after the serialized message
125//! to the array that will be posted. As shown above, this also works for the return type by just
126//! specifying `return` in the post attribute. For web types that need to be transferred, we simply
127//! wrap them in `transfer` as follows:
128//! ```rust
129//! #[web_rpc::service]
130//! pub trait GameEngine {
131//! #[post(transfer(canvas))]
132//! fn send_canvas(
133//! canvas: js_sys::OffscreenCanvas,
134//! );
135//! }
136//! ```
137//! ### Bi-directional RPC
138//! In the original example, we created a server on the first port of the message channel and a
139//! client on the second port. However, it is possible to define both a client and a server on each
140//! side, enabling bi-directional RPC. This is particularly useful if we want to send and receive
141//! messages from a worker without sending it a seperate channel for the bi-directional
142//! communication. Our original example can be extended as follows:
143//! ```rust
144//! /* create channel */
145//! let channel = web_sys::MessageChannel::new().unwrap();
146//! let (interface1, interface2) = futures_util::future::join(
147//! web_rpc::Interface::new(channel.port1()),
148//! web_rpc::Interface::new(channel.port2()),
149//! ).await;
150//! /* create server1 and client1 */
151//! let (client1, server1) = web_rpc::Builder::new(interface1)
152//! .with_service::<CalculatorService<_>>(CalculatorServiceImpl)
153//! .with_client::<CalculatorClient>()
154//! .build();
155//! /* create server2 and client2 */
156//! let (client2, server2) = web_rpc::Builder::new(interface2)
157//! .with_service::<CalculatorService<_>>(CalculatorServiceImpl)
158//! .with_client::<CalculatorClient>()
159//! .build();
160//! ```
161
162use std::{
163 cell::RefCell,
164 marker::PhantomData,
165 pin::Pin,
166 rc::Rc,
167 task::{Context, Poll},
168};
169
170use futures_channel::mpsc;
171use futures_core::{future::LocalBoxFuture, Future};
172use futures_util::{FutureExt, StreamExt};
173use gloo_events::EventListener;
174use js_sys::{ArrayBuffer, Uint8Array};
175use serde::{de::DeserializeOwned, Deserialize, Serialize};
176use wasm_bindgen::JsCast;
177
178#[doc(hidden)]
179pub use bincode;
180#[doc(hidden)]
181pub use futures_channel;
182#[doc(hidden)]
183pub use futures_core;
184#[doc(hidden)]
185pub use futures_util;
186#[doc(hidden)]
187pub use gloo_events;
188#[doc(hidden)]
189pub use js_sys;
190#[doc(hidden)]
191pub use pin_utils;
192#[doc(hidden)]
193pub use serde;
194#[doc(hidden)]
195pub use wasm_bindgen;
196
197pub use web_rpc_macro::service;
198
199pub mod client;
200pub mod interface;
201pub mod port;
202#[doc(hidden)]
203pub mod service;
204
205pub use interface::Interface;
206
207#[doc(hidden)]
208#[derive(Serialize, Deserialize)]
209pub enum Message<Request, Response> {
210 Request(usize, Request),
211 Abort(usize),
212 Response(usize, Response),
213}
214
215/// This struct allows one to configure the RPC interface prior to creating it.
216/// To get an instance of this struct, call [`Builder<C, S>::new`] with
217/// an [`Interface`].
218pub struct Builder<C, S> {
219 client: PhantomData<C>,
220 service: S,
221 interface: Interface,
222}
223
224impl Builder<(), ()> {
225 /// Create a new builder from an [`Interface`]
226 pub fn new(interface: Interface) -> Self {
227 Self {
228 interface,
229 client: PhantomData::<()>,
230 service: (),
231 }
232 }
233}
234
235impl<C> Builder<C, ()> {
236 /// Configure the RPC interface with a service that implements methods
237 /// that can be called from the other side of the channel. To use this method,
238 /// you need to specify the type `S` which is the service type generated by the
239 /// attribute macro [`macro@service`]. The implementation parameter is then an
240 /// instance of something that implements the trait to which to applied the
241 /// [`macro@service`] macro. For example, if you have a trait `Calculator` to
242 /// which you have applied [`macro@service`], you would use this method as follows:
243 /// ```
244 /// struct CalculatorServiceImpl;
245 /// impl Calculator for CalculatorServiceImpl { /* add Calculator's methods */}
246 /// let server = Builder::new(some_interface)
247 /// .with_service<CalculatorService<_>>(CalculatorServiceImpl)
248 /// .build();
249 /// ```
250 pub fn with_service<S: service::Service>(self, implementation: impl Into<S>) -> Builder<C, S> {
251 let service = implementation.into();
252 let Builder {
253 interface, client, ..
254 } = self;
255 Builder {
256 interface,
257 client,
258 service,
259 }
260 }
261}
262
263impl<S> Builder<(), S> {
264 /// Configure the RPC interface with a client that allows you to execute RPCs on the
265 /// server. The builder will automatically instansiate the client for you, you just
266 /// need to provide the type which is generated via the [`macro@service`] attribute
267 /// macro. For example, if you had a trait `Calculator` to which you applied the
268 /// [`macro@service`] attribute macro, the macro would have generated a `CalculatorClient`
269 /// struct which you can use as the `C` in this function.
270 pub fn with_client<C: client::Client>(self) -> Builder<C, S> {
271 let Builder {
272 interface, service, ..
273 } = self;
274 Builder {
275 interface,
276 client: PhantomData::<C>,
277 service,
278 }
279 }
280}
281
282/// `Server` is the server that is returned from the [`Builder::build`] method given
283/// you configured the RPC interface with a service. Note that `Server` implements future and needs
284/// to be polled in order to execute and respond to inbound RPC requests.
285#[must_use = "Server must be polled in order for RPC requests to be executed"]
286pub struct Server {
287 _listener: Rc<EventListener>,
288 task: LocalBoxFuture<'static, ()>,
289}
290
291impl Future for Server {
292 type Output = ();
293
294 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
295 self.task.poll_unpin(cx)
296 }
297}
298
299impl<C> Builder<C, ()>
300where
301 C: client::Client + From<client::Configuration<C::Request, C::Response>> + 'static,
302 <C as client::Client>::Response: DeserializeOwned,
303 <C as client::Client>::Request: Serialize,
304{
305 /// Build function for client-only RPC interfaces.
306 pub fn build(self) -> C {
307 let Builder {
308 interface:
309 Interface {
310 port,
311 listener,
312 mut messages_rx,
313 },
314 ..
315 } = self;
316 let client_callback_map: Rc<RefCell<client::CallbackMap<C::Response>>> = Default::default();
317 let client_callback_map_cloned = client_callback_map.clone();
318 let dispatcher = async move {
319 while let Some(array) = messages_rx.next().await {
320 let message =
321 Uint8Array::new(&array.shift().dyn_into::<ArrayBuffer>().unwrap()).to_vec();
322 match bincode::deserialize::<Message<(), C::Response>>(&message).unwrap() {
323 Message::Response(seq_id, response) => {
324 if let Some(callback_tx) =
325 client_callback_map_cloned.borrow_mut().remove(&seq_id)
326 {
327 let _ = callback_tx.send((response, array));
328 }
329 }
330 _ => panic!("client received a server message"),
331 }
332 }
333 }
334 .boxed_local()
335 .shared();
336 let port_cloned = port.clone();
337 let abort_sender = move |seq_id: usize| {
338 let abort = Message::<C::Request, ()>::Abort(seq_id);
339 let abort = bincode::serialize(&abort).unwrap();
340 let buffer = js_sys::Uint8Array::from(&abort[..]).buffer();
341 let post_args = js_sys::Array::of1(&buffer);
342 let transfer_args = js_sys::Array::of1(&buffer);
343 port_cloned
344 .post_message(&post_args, &transfer_args)
345 .unwrap();
346 };
347 let request_serializer = |seq_id: usize, request: C::Request| {
348 let request = Message::<C::Request, ()>::Request(seq_id, request);
349 bincode::serialize(&request).unwrap()
350 };
351 C::from((
352 client_callback_map,
353 port,
354 Rc::new(listener),
355 dispatcher,
356 Rc::new(request_serializer),
357 Rc::new(abort_sender),
358 ))
359 }
360}
361
362impl<S> Builder<(), S>
363where
364 S: service::Service + 'static,
365 <S as service::Service>::Request: DeserializeOwned,
366 <S as service::Service>::Response: Serialize,
367{
368 /// Build function for server-only RPC interfaces.
369 pub fn build(self) -> Server {
370 let Builder {
371 service,
372 interface:
373 Interface {
374 port,
375 listener,
376 mut messages_rx,
377 },
378 ..
379 } = self;
380 let (server_requests_tx, server_requests_rx) = mpsc::unbounded();
381 let (abort_requests_tx, abort_requests_rx) = mpsc::unbounded();
382 let dispatcher = async move {
383 while let Some(array) = messages_rx.next().await {
384 let message =
385 Uint8Array::new(&array.shift().dyn_into::<ArrayBuffer>().unwrap()).to_vec();
386 match bincode::deserialize::<Message<S::Request, ()>>(&message).unwrap() {
387 Message::Request(seq_id, request) => server_requests_tx
388 .unbounded_send((seq_id, request, array))
389 .unwrap(),
390 Message::Abort(seq_id) => abort_requests_tx.unbounded_send(seq_id).unwrap(),
391 _ => panic!("server received a client message"),
392 }
393 }
394 }
395 .boxed_local()
396 .shared();
397 Server {
398 _listener: Rc::new(listener),
399 task: service::task::<S, ()>(
400 service,
401 port,
402 dispatcher,
403 server_requests_rx,
404 abort_requests_rx,
405 )
406 .boxed_local(),
407 }
408 }
409}
410
411impl<C, S> Builder<C, S>
412where
413 C: client::Client + From<client::Configuration<C::Request, C::Response>> + 'static,
414 S: service::Service + 'static,
415 <S as service::Service>::Request: DeserializeOwned,
416 <S as service::Service>::Response: Serialize,
417 <C as client::Client>::Request: Serialize,
418 <C as client::Client>::Response: DeserializeOwned,
419{
420 /// Build function for client-server RPC interfaces.
421 pub fn build(self) -> (C, Server) {
422 let Builder {
423 service: server,
424 interface:
425 Interface {
426 port,
427 listener,
428 mut messages_rx,
429 },
430 ..
431 } = self;
432 let client_callback_map: Rc<RefCell<client::CallbackMap<C::Response>>> = Default::default();
433 let (server_requests_tx, server_requests_rx) = mpsc::unbounded();
434 let (abort_requests_tx, abort_requests_rx) = mpsc::unbounded();
435 let client_callback_map_cloned = client_callback_map.clone();
436 let dispatcher = async move {
437 while let Some(array) = messages_rx.next().await {
438 let message = array.shift().dyn_into::<ArrayBuffer>().unwrap();
439 let message = Uint8Array::new(&message).to_vec();
440 match bincode::deserialize::<Message<S::Request, C::Response>>(&message).unwrap() {
441 Message::Response(seq_id, response) => {
442 if let Some(callback_tx) =
443 client_callback_map_cloned.borrow_mut().remove(&seq_id)
444 {
445 let _ = callback_tx.send((response, array));
446 }
447 }
448 Message::Request(seq_id, request) => server_requests_tx
449 .unbounded_send((seq_id, request, array))
450 .unwrap(),
451 Message::Abort(seq_id) => abort_requests_tx.unbounded_send(seq_id).unwrap(),
452 }
453 }
454 }
455 .boxed_local()
456 .shared();
457 let port_cloned = port.clone();
458 let abort_sender = move |seq_id: usize| {
459 let abort = Message::<C::Request, S::Response>::Abort(seq_id);
460 let abort = bincode::serialize(&abort).unwrap();
461 let buffer = js_sys::Uint8Array::from(&abort[..]).buffer();
462 let post_args = js_sys::Array::of1(&buffer);
463 let transfer_args = js_sys::Array::of1(&buffer);
464 port_cloned
465 .post_message(&post_args, &transfer_args)
466 .unwrap();
467 };
468 let request_serializer = |seq_id: usize, request: C::Request| {
469 let request = Message::<C::Request, S::Response>::Request(seq_id, request);
470 bincode::serialize(&request).unwrap()
471 };
472 let listener = Rc::new(listener);
473 let client = C::from((
474 client_callback_map,
475 port.clone(),
476 listener.clone(),
477 dispatcher.clone(),
478 Rc::new(request_serializer),
479 Rc::new(abort_sender),
480 ));
481 let server = Server {
482 _listener: listener,
483 task: service::task::<S, C::Request>(
484 server,
485 port,
486 dispatcher,
487 server_requests_rx,
488 abort_requests_rx,
489 )
490 .boxed_local(),
491 };
492 (client, server)
493 }
494}