web_rpc/lib.rs
1//! Bidirectional RPC for browsing contexts, web workers, and message channels.
2//!
3//! This crate allows you to define a service as a trait and annotate it with
4//! [`#[web_rpc::service]`](macro@service). The macro then produces a `*Client`, a `*Service`,
5//! and a forwarding trait that you can implement on the server side.
6//!
7//! Routing is inferred from each type: anything implementing
8//! [`AsRef<JsValue>`](https://docs.rs/wasm-bindgen/latest/wasm_bindgen/struct.JsValue.html) is
9//! posted through `postMessage` directly and everything that is serializable is first encoded
10//! via bincode. There is special support for `Option<T>` and `Result<T, E>` to allow Javascript
11//! types to be embedded within these types. This behaviour is recursive.
12//!
13//! # Quickstart
14//! ```rust
15//! #[web_rpc::service]
16//! pub trait Calculator {
17//! fn add(&self, left: u32, right: u32) -> u32;
18//! }
19//! struct Calc;
20//! impl Calculator for Calc {
21//! fn add(&self, left: u32, right: u32) -> u32 { left + right }
22//! }
23//! ```
24//! Wire up over a `MessageChannel`, [`Worker`](https://docs.rs/web-sys/latest/web_sys/struct.Worker.html),
25//! or any [`MessagePort`](https://docs.rs/web-sys/latest/web_sys/struct.MessagePort.html). Each
26//! Each call to [`Interface::new`] is async because temporary listeners need to detect when
27//! both ends are ready.
28//! ```rust,no_run
29//! # #[web_rpc::service]
30//! # pub trait Calculator { fn add(&self, l: u32, r: u32) -> u32; }
31//! # struct Calc;
32//! # impl Calculator for Calc { fn add(&self, l: u32, r: u32) -> u32 { l + r } }
33//! # async fn run() {
34//! let channel = web_sys::MessageChannel::new().unwrap();
35//! let (server_iface, client_iface) = futures_util::future::join(
36//! web_rpc::Interface::new(channel.port1()),
37//! web_rpc::Interface::new(channel.port2()),
38//! ).await;
39//!
40//! let server = web_rpc::Builder::new(server_iface)
41//! .with_service::<CalculatorService<_>>(Calc)
42//! .build();
43//! wasm_bindgen_futures::spawn_local(server);
44//!
45//! let client = web_rpc::Builder::new(client_iface)
46//! .with_client::<CalculatorClient>()
47//! .build();
48//! assert_eq!(client.add(41, 1).await, 42);
49//! # }
50//! ```
51//!
52//! # Routing
53//! ```rust
54//! #[web_rpc::service]
55//! pub trait Routing {
56//! // Plain types that implement Serialize go through bincode.
57//! fn add(&self, l: u32, r: u32) -> u32;
58//! // Anything `AsRef<JsValue>` is posted through the JS array.
59//! fn echo(&self, s: js_sys::JsString) -> js_sys::JsString;
60//! // `Option`/`Result` recurse: Ok(Some(_)) is posted, Ok(None) is one byte,
61//! // Err carries a bincoded `String`.
62//! fn lookup(&self, k: u32) -> Result<Option<js_sys::JsString>, String>;
63//! // `&str` / `&[u8]` deserialize zero-copy on the server.
64//! fn count(&self, data: &[u8]) -> usize;
65//! // References to JS types are accepted too and are decoded via JsCast::dyn_ref.
66//! fn len(&self, s: &js_sys::JsString) -> u32;
67//! }
68//! ```
69//!
70//! # Async, notifications, streaming
71//! ```rust
72//! use futures_core::Stream;
73//!
74//! #[web_rpc::service]
75//! pub trait Misc {
76//! // `async` here makes the server impl async; the client side is also async because we return a u32.
77//! async fn slow(&self, ms: u32) -> u32;
78//! // No return type means the method is a notification.
79//! fn fire(&self, msg: String);
80//! // `impl Stream<Item = T>` makes the method a streaming RPC.
81//! fn items(&self, n: u32) -> impl Stream<Item = u32>;
82//! }
83//! ```
84//! On the client side, RPC methods that have a return type are async and yield a
85//! [`client::RequestFuture<T>`] which you await for the response. Methods without a return type
86//! are sync and act as fire-and-forget notifications. This is independent of whether the trait
87//! method itself is marked `async`, which only affects the server implementation. Dropping the
88//! `RequestFuture` cancels the request, so notifications cannot be cancelled.
89//!
90//! Streaming methods return a [`client::StreamReceiver<T>`] that yields each item the server
91//! produces. Dropping the receiver aborts the stream on the server, while
92//! [`close`](client::StreamReceiver::close) lets buffered items finish arriving instead.
93//! Streaming methods can also be `async` and the items they yield can be wrapper types like
94//! `Result<JsT, E>`.
95//!
96//! # Transfer
97//! Anything that should be transferred to the other side rather than copied with the structured
98//! clone algorithm can be specified inside a `#[transfer(...)]` attribute as a comma-separated
99//! list. The simplest case is to list the parameter that holds the transferable value, but if
100//! that value is wrapped or derived from a parameter, you can use a parameter-name expression
101//! (`name => expr`, evaluated with `name` in scope), a closure with a refutable pattern
102//! (`name => |pat| body`), or a match-block (`name => match { arm, ... }`). The same forms also
103//! work for the return value via `return`.
104//! ```rust
105//! # use wasm_bindgen::JsCast;
106//! #[web_rpc::service]
107//! pub trait Transfer {
108//! // Bare param + derived expression + return closure.
109//! #[transfer(
110//! canvas,
111//! data => data.buffer(),
112//! return => |Ok(buf)| buf.buffer(),
113//! )]
114//! fn render(
115//! &self,
116//! canvas: web_sys::OffscreenCanvas,
117//! data: js_sys::Uint8Array,
118//! ) -> Result<js_sys::Uint8Array, String>;
119//!
120//! // Match-block: useful when several variants need transferring.
121//! #[transfer(return => match { Some(buf) => buf.buffer(), })]
122//! fn maybe(&self) -> Option<js_sys::Uint8Array>;
123//! }
124//! ```
125//!
126//! # Bi-directional
127//! Both sides of a channel can be set up to act as both client and server at the same time. To
128//! do this, stack [`with_service`](Builder::with_service) and
129//! [`with_client`](Builder::with_client) on the same [`Builder`] before calling `build()`, which
130//! then returns a `(C, Server)` tuple instead of one or the other.
131//! ```rust,no_run
132//! # #[web_rpc::service]
133//! # pub trait Calculator { fn add(&self, l: u32, r: u32) -> u32; }
134//! # struct Calc;
135//! # impl Calculator for Calc { fn add(&self, l: u32, r: u32) -> u32 { l + r } }
136//! # async fn run() {
137//! # let channel = web_sys::MessageChannel::new().unwrap();
138//! # let (iface, _) = futures_util::future::join(
139//! # web_rpc::Interface::new(channel.port1()),
140//! # web_rpc::Interface::new(channel.port2()),
141//! # ).await;
142//! let (client, server) = web_rpc::Builder::new(iface)
143//! .with_service::<CalculatorService<_>>(Calc)
144//! .with_client::<CalculatorClient>()
145//! .build();
146//! # }
147//! ```
148
149use std::{
150 cell::RefCell,
151 marker::PhantomData,
152 pin::Pin,
153 rc::Rc,
154 task::{Context, Poll},
155};
156
157use futures_channel::mpsc;
158use futures_core::{future::LocalBoxFuture, Future};
159use futures_util::{FutureExt, StreamExt};
160use gloo_events::EventListener;
161use js_sys::{ArrayBuffer, Uint8Array};
162use serde::{de::DeserializeOwned, Deserialize, Serialize};
163use wasm_bindgen::JsCast;
164
165#[doc(hidden)]
166pub use bincode;
167#[doc(hidden)]
168pub use futures_channel;
169#[doc(hidden)]
170pub use futures_core;
171#[doc(hidden)]
172pub use futures_util;
173#[doc(hidden)]
174pub use gloo_events;
175#[doc(hidden)]
176pub use js_sys;
177#[doc(hidden)]
178pub use pin_utils;
179#[doc(hidden)]
180pub use serde;
181#[doc(hidden)]
182pub use wasm_bindgen;
183
184pub use web_rpc_macro::service;
185
186pub mod client;
187#[doc(hidden)]
188pub mod codec;
189pub mod interface;
190pub mod port;
191#[doc(hidden)]
192pub mod service;
193
194pub use interface::Interface;
195
196#[doc(hidden)]
197#[derive(Serialize, Deserialize)]
198pub enum MessageHeader {
199 Request(usize),
200 Abort(usize),
201 Response(usize),
202 StreamItem(usize),
203 StreamEnd(usize),
204}
205
206/// This struct allows one to configure the RPC interface prior to creating it.
207/// To get an instance of this struct, call [`Builder<C, S>::new`] with
208/// an [`Interface`].
209pub struct Builder<C, S> {
210 client: PhantomData<C>,
211 service: S,
212 interface: Interface,
213}
214
215impl Builder<(), ()> {
216 /// Create a new builder from an [`Interface`]
217 pub fn new(interface: Interface) -> Self {
218 Self {
219 interface,
220 client: PhantomData::<()>,
221 service: (),
222 }
223 }
224}
225
226impl<C> Builder<C, ()> {
227 /// Configure the RPC interface with a service that implements methods
228 /// that can be called from the other side of the channel. To use this method,
229 /// you need to specify the type `S` which is the service type generated by the
230 /// attribute macro [`macro@service`]. The implementation parameter is then an
231 /// instance of something that implements the trait to which to applied the
232 /// [`macro@service`] macro. For example, if you have a trait `Calculator` to
233 /// which you have applied [`macro@service`], you would use this method as follows:
234 /// ```rust,no_run
235 /// # #[web_rpc::service]
236 /// # pub trait Calculator {
237 /// # fn add(&self, left: u32, right: u32) -> u32;
238 /// # }
239 /// # struct CalculatorServiceImpl;
240 /// # impl Calculator for CalculatorServiceImpl {
241 /// # fn add(&self, left: u32, right: u32) -> u32 { left + right }
242 /// # }
243 /// # fn example(some_interface: web_rpc::Interface) {
244 /// let server = web_rpc::Builder::new(some_interface)
245 /// .with_service::<CalculatorService<_>>(CalculatorServiceImpl)
246 /// .build();
247 /// # }
248 /// ```
249 pub fn with_service<S: service::Service>(self, implementation: impl Into<S>) -> Builder<C, S> {
250 let service = implementation.into();
251 let Builder {
252 interface, client, ..
253 } = self;
254 Builder {
255 interface,
256 client,
257 service,
258 }
259 }
260}
261
262impl<S> Builder<(), S> {
263 /// Configure the RPC interface with a client that allows you to execute RPCs on the
264 /// server. The builder will automatically instansiate the client for you, you just
265 /// need to provide the type which is generated via the [`macro@service`] attribute
266 /// macro. For example, if you had a trait `Calculator` to which you applied the
267 /// [`macro@service`] attribute macro, the macro would have generated a `CalculatorClient`
268 /// struct which you can use as the `C` in this function.
269 pub fn with_client<C: client::Client>(self) -> Builder<C, S> {
270 let Builder {
271 interface, service, ..
272 } = self;
273 Builder {
274 interface,
275 client: PhantomData::<C>,
276 service,
277 }
278 }
279}
280
281/// `Server` is the server that is returned from the [`Builder::build`] method given
282/// you configured the RPC interface with a service. Note that `Server` implements future and needs
283/// to be polled in order to execute and respond to inbound RPC requests.
284#[must_use = "Server must be polled in order for RPC requests to be executed"]
285pub struct Server {
286 _listener: Rc<EventListener>,
287 task: LocalBoxFuture<'static, ()>,
288}
289
290impl Future for Server {
291 type Output = ();
292
293 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
294 self.task.poll_unpin(cx)
295 }
296}
297
298impl<C> Builder<C, ()>
299where
300 C: client::Client + From<client::Configuration<C::Response>> + 'static,
301 <C as client::Client>::Response: DeserializeOwned,
302{
303 /// Build function for client-only RPC interfaces.
304 pub fn build(self) -> C {
305 let Builder {
306 interface:
307 Interface {
308 port,
309 listener,
310 mut messages_rx,
311 },
312 ..
313 } = self;
314 let client_callback_map: Rc<RefCell<client::CallbackMap<C::Response>>> = Default::default();
315 let client_callback_map_cloned = client_callback_map.clone();
316 let stream_callback_map: Rc<RefCell<client::StreamCallbackMap<C::Response>>> =
317 Default::default();
318 let stream_callback_map_cloned = stream_callback_map.clone();
319 let dispatcher = async move {
320 while let Some(array) = messages_rx.next().await {
321 let header_bytes =
322 Uint8Array::new(&array.shift().dyn_into::<ArrayBuffer>().unwrap()).to_vec();
323 let header: MessageHeader = bincode::deserialize(&header_bytes).unwrap();
324 match header {
325 MessageHeader::Response(seq_id) => {
326 let payload_bytes =
327 Uint8Array::new(&array.shift().dyn_into::<ArrayBuffer>().unwrap())
328 .to_vec();
329 let response: C::Response = bincode::deserialize(&payload_bytes).unwrap();
330 if let Some(callback_tx) =
331 client_callback_map_cloned.borrow_mut().remove(&seq_id)
332 {
333 let _ = callback_tx.send((response, array));
334 }
335 }
336 MessageHeader::StreamItem(seq_id) => {
337 let payload_bytes =
338 Uint8Array::new(&array.shift().dyn_into::<ArrayBuffer>().unwrap())
339 .to_vec();
340 let response: C::Response = bincode::deserialize(&payload_bytes).unwrap();
341 if let Some(tx) = stream_callback_map_cloned.borrow().get(&seq_id) {
342 let _ = tx.unbounded_send((response, array));
343 }
344 }
345 MessageHeader::StreamEnd(seq_id) => {
346 stream_callback_map_cloned.borrow_mut().remove(&seq_id);
347 }
348 _ => panic!("client received a server message"),
349 }
350 }
351 }
352 .boxed_local()
353 .shared();
354 let port_cloned = port.clone();
355 let abort_sender = move |seq_id: usize| {
356 let header = MessageHeader::Abort(seq_id);
357 let header_bytes = bincode::serialize(&header).unwrap();
358 let buffer = js_sys::Uint8Array::from(&header_bytes[..]).buffer();
359 let post_args = js_sys::Array::of1(&buffer);
360 let transfer_args = js_sys::Array::of1(&buffer);
361 port_cloned
362 .post_message(&post_args, &transfer_args)
363 .unwrap();
364 };
365 C::from((
366 client_callback_map,
367 stream_callback_map,
368 port,
369 Rc::new(listener),
370 dispatcher,
371 Rc::new(abort_sender),
372 ))
373 }
374}
375
376impl<S> Builder<(), S>
377where
378 S: service::Service + 'static,
379 <S as service::Service>::Response: Serialize,
380{
381 /// Build function for server-only RPC interfaces.
382 pub fn build(self) -> Server {
383 let Builder {
384 service,
385 interface:
386 Interface {
387 port,
388 listener,
389 mut messages_rx,
390 },
391 ..
392 } = self;
393 let (server_requests_tx, server_requests_rx) = mpsc::unbounded();
394 let (abort_requests_tx, abort_requests_rx) = mpsc::unbounded();
395 let dispatcher = async move {
396 while let Some(array) = messages_rx.next().await {
397 let header_bytes =
398 Uint8Array::new(&array.shift().dyn_into::<ArrayBuffer>().unwrap()).to_vec();
399 let header: MessageHeader = bincode::deserialize(&header_bytes).unwrap();
400 match header {
401 MessageHeader::Request(seq_id) => {
402 let payload =
403 Uint8Array::new(&array.shift().dyn_into::<ArrayBuffer>().unwrap())
404 .to_vec();
405 server_requests_tx
406 .unbounded_send((seq_id, payload, array))
407 .unwrap();
408 }
409 MessageHeader::Abort(seq_id) => {
410 abort_requests_tx.unbounded_send(seq_id).unwrap();
411 }
412 _ => panic!("server received a client message"),
413 }
414 }
415 }
416 .boxed_local()
417 .shared();
418 Server {
419 _listener: Rc::new(listener),
420 task: service::task::<S>(
421 service,
422 port,
423 dispatcher,
424 server_requests_rx,
425 abort_requests_rx,
426 )
427 .boxed_local(),
428 }
429 }
430}
431
432impl<C, S> Builder<C, S>
433where
434 C: client::Client + From<client::Configuration<C::Response>> + 'static,
435 S: service::Service + 'static,
436 <S as service::Service>::Response: Serialize,
437 <C as client::Client>::Response: DeserializeOwned,
438{
439 /// Build function for client-server RPC interfaces.
440 pub fn build(self) -> (C, Server) {
441 let Builder {
442 service: server,
443 interface:
444 Interface {
445 port,
446 listener,
447 mut messages_rx,
448 },
449 ..
450 } = self;
451 let client_callback_map: Rc<RefCell<client::CallbackMap<C::Response>>> = Default::default();
452 let stream_callback_map: Rc<RefCell<client::StreamCallbackMap<C::Response>>> =
453 Default::default();
454 let (server_requests_tx, server_requests_rx) = mpsc::unbounded();
455 let (abort_requests_tx, abort_requests_rx) = mpsc::unbounded();
456 let client_callback_map_cloned = client_callback_map.clone();
457 let stream_callback_map_cloned = stream_callback_map.clone();
458 let dispatcher = async move {
459 while let Some(array) = messages_rx.next().await {
460 let header_bytes =
461 Uint8Array::new(&array.shift().dyn_into::<ArrayBuffer>().unwrap()).to_vec();
462 let header: MessageHeader = bincode::deserialize(&header_bytes).unwrap();
463 match header {
464 MessageHeader::Response(seq_id) => {
465 let payload_bytes =
466 Uint8Array::new(&array.shift().dyn_into::<ArrayBuffer>().unwrap())
467 .to_vec();
468 let response: C::Response = bincode::deserialize(&payload_bytes).unwrap();
469 if let Some(callback_tx) =
470 client_callback_map_cloned.borrow_mut().remove(&seq_id)
471 {
472 let _ = callback_tx.send((response, array));
473 }
474 }
475 MessageHeader::StreamItem(seq_id) => {
476 let payload_bytes =
477 Uint8Array::new(&array.shift().dyn_into::<ArrayBuffer>().unwrap())
478 .to_vec();
479 let response: C::Response = bincode::deserialize(&payload_bytes).unwrap();
480 if let Some(tx) = stream_callback_map_cloned.borrow().get(&seq_id) {
481 let _ = tx.unbounded_send((response, array));
482 }
483 }
484 MessageHeader::StreamEnd(seq_id) => {
485 stream_callback_map_cloned.borrow_mut().remove(&seq_id);
486 }
487 MessageHeader::Request(seq_id) => {
488 let payload =
489 Uint8Array::new(&array.shift().dyn_into::<ArrayBuffer>().unwrap())
490 .to_vec();
491 server_requests_tx
492 .unbounded_send((seq_id, payload, array))
493 .unwrap();
494 }
495 MessageHeader::Abort(seq_id) => {
496 abort_requests_tx.unbounded_send(seq_id).unwrap();
497 }
498 }
499 }
500 }
501 .boxed_local()
502 .shared();
503 let port_cloned = port.clone();
504 let abort_sender = move |seq_id: usize| {
505 let header = MessageHeader::Abort(seq_id);
506 let header_bytes = bincode::serialize(&header).unwrap();
507 let buffer = js_sys::Uint8Array::from(&header_bytes[..]).buffer();
508 let post_args = js_sys::Array::of1(&buffer);
509 let transfer_args = js_sys::Array::of1(&buffer);
510 port_cloned
511 .post_message(&post_args, &transfer_args)
512 .unwrap();
513 };
514 let listener = Rc::new(listener);
515 let client = C::from((
516 client_callback_map,
517 stream_callback_map,
518 port.clone(),
519 listener.clone(),
520 dispatcher.clone(),
521 Rc::new(abort_sender),
522 ));
523 let server = Server {
524 _listener: listener,
525 task: service::task::<S>(
526 server,
527 port,
528 dispatcher,
529 server_requests_rx,
530 abort_requests_rx,
531 )
532 .boxed_local(),
533 };
534 (client, server)
535 }
536}