Skip to main content

web_rpc/
lib.rs

1//! The `web-rpc` create is a library for performing RPCs (remote proceedure calls) between
2//! browsing contexts, web workers, and channels. It allows you to define an RPC using a trait
3//! similar to Google's [tarpc](https://github.com/google/tarpc) and will transparently
4//! handle the serialization and deserialization of the arguments. Moreover, it can post
5//! anything that implements [`AsRef<JsValue>`](https://docs.rs/wasm-bindgen/latest/wasm_bindgen/struct.JsValue.html) and also supports transferring ownership.
6//!
7//! ## Quick start
8//! To get started define a trait for your RPC service as follows. Annnotate this trait with the
9//! `service` procedural macro that is exported by this crate:
10//! ```rust
11//! #[web_rpc::service]
12//! pub trait Calculator {
13//!     fn add(&self, left: u32, right: u32) -> u32;
14//! }
15//! ```
16//! This macro will generate the structs `CalculatorClient`, `CalculatorService`, and a new trait
17//! `Calculator` that you can use to implement the service as follows:
18//! ```rust
19//! # #[web_rpc::service]
20//! # pub trait Calculator {
21//! #     fn add(&self, left: u32, right: u32) -> u32;
22//! # }
23//! struct CalculatorServiceImpl;
24//!
25//! impl Calculator for CalculatorServiceImpl {
26//!     fn add(&self, left: u32, right: u32) -> u32 {
27//!         left + right
28//!     }
29//! }
30//! ```
31//! Note that the `&self` receiver is required in the trait definition. Although not
32//! used in this example, this is useful when we want the RPC to modify some state (via interior
33//! mutability). Now that we have defined our RPC, let's create a client and server for it! In this
34//! example, we will use [`MessageChannel`](https://docs.rs/web-sys/latest/web_sys/struct.MessageChannel.html)
35//! since it is easy to construct and test, however, a more common case would be to construct the
36//! channel from a [`Worker`](https://docs.rs/web-sys/latest/web_sys/struct.Worker.html) or a
37//! [`DedicatedWorkerGlobalScope`](https://docs.rs/web-sys/latest/web_sys/struct.DedicatedWorkerGlobalScope.html).
38//! Let's start by defining the server:
39//! ```rust,no_run
40//! # #[web_rpc::service]
41//! # pub trait Calculator {
42//! #     fn add(&self, left: u32, right: u32) -> u32;
43//! # }
44//! # struct CalculatorServiceImpl;
45//! # impl Calculator for CalculatorServiceImpl {
46//! #     fn add(&self, left: u32, right: u32) -> u32 { left + right }
47//! # }
48//! # async fn example() {
49//! // create a MessageChannel
50//! let channel = web_sys::MessageChannel::new().unwrap();
51//! // Create two interfaces from the ports
52//! let (server_interface, client_interface) = futures_util::future::join(
53//!     web_rpc::Interface::new(channel.port1()),
54//!     web_rpc::Interface::new(channel.port2()),
55//! ).await;
56//! // create a server with the first interface
57//! let server = web_rpc::Builder::new(server_interface)
58//!     .with_service::<CalculatorService<_>>(CalculatorServiceImpl)
59//!     .build();
60//! // spawn the server
61//! wasm_bindgen_futures::spawn_local(server);
62//! # }
63//! ```
64//! [`Interface::new`] is async since there is no way to synchronously check whether a channel or
65//! a worker is ready to receive messages. To workaround this, temporary listeners are attached to
66//! determine when a channel is ready for communication. The server returned by the build method is
67//! a future that can be added to the browser's event loop using
68//! [`wasm_bindgen_futures::spawn_local`], however, this will run the server indefinitely. For more
69//! control, consider wrapping the server with [`futures_util::FutureExt::remote_handle`] before
70//! spawning it, which will shutdown the server once the handle has been dropped. Moving onto the
71//! client:
72//! ```rust,no_run
73//! # #[web_rpc::service]
74//! # pub trait Calculator {
75//! #     fn add(&self, left: u32, right: u32) -> u32;
76//! # }
77//! # async fn example(client_interface: web_rpc::Interface) {
78//! // create a client using the second interface
79//! let client = web_rpc::Builder::new(client_interface)
80//!     .with_client::<CalculatorClient>()
81//!     .build();
82//! /* call `add` */
83//! assert_eq!(client.add(41, 1).await, 42);
84//! # }
85//! ```
86//! That is it! Underneath the hood, the client will serialize its arguments using bincode and
87//! transfer the bytes to server. The server will deserialize those arguments and run
88//! `<CalculatorServiceImpl as Calculator>::add` before returning the result to the client. Note
89//! that we are only awaiting the response of the call to `add`, the request itself is sent
90//! synchronously before we await anything.
91//!
92//! ## Advanced examples
93//! Now that we have the basic idea of how define an RPC trait and set up a server and client, let's
94//! dive into some of the more advanced features of this library!
95//!
96//! ### Synchronous and asynchronous RPC methods
97//! Server methods can be asynchronous! That is, you can define the following RPC trait and service
98//! implementation:
99//! ```rust,no_run
100//! # use std::time::Duration;
101//! #[web_rpc::service]
102//! pub trait Sleep {
103//!     async fn sleep(&self, interval: Duration);
104//! }
105//!
106//! struct SleepServiceImpl;
107//! impl Sleep for SleepServiceImpl {
108//!     async fn sleep(&self, interval: Duration) {
109//!         gloo_timers::future::sleep(interval).await;
110//!     }
111//! }
112//! ```
113//! Asynchronous RPC methods are run concurrently on the server and also support cancellation if the
114//! future on the client side is dropped. However, such a future is only returned from a client
115//! method if the RPC returns a value. Otherwise the RPC is considered a notification.
116//!
117//! ### Notifications
118//! Notifications are RPCs that do not return anything. On the client side, the method is completely
119//! synchronous and also returns nothing. This setup is useful if you need to communicate with
120//! another part of your application but cannot yield to the event loop.
121//!
122//! The implication of this, however, is that even if the server method is asynchronous, we are
123//! unable to cancel it from the client side since we do not have a future that can be dropped.
124//!
125//! ### Posting and transferring Javascript types
126//! In the example above, we discussed how the client serializes its arguments before sending them
127//! to the server. This approach is convenient, but how do send web types such as a
128//! `WebAssembly.Module` or an `OffscreenCanvas` that have no serializable representation? Well, we
129//! are in luck since this happens to be one of the key features of this crate. Consider the
130//! following RPC trait:
131//! ```rust
132//! #[web_rpc::service]
133//! pub trait Concat {
134//!     #[post(left, right, return)]
135//!     fn concat_with_space(
136//!         &self,
137//!         left: js_sys::JsString,
138//!         right: js_sys::JsString
139//!     ) -> js_sys::JsString;
140//! }
141//! ```
142//! All we have done is added the `post` attribute to the method and listed the arguments that we
143//! would like to be posted to the other side. Under the hood, the implementation of the client will
144//! then skip these arguments during serialization and just append them after the serialized message
145//! to the array that will be posted. As shown above, this also works for the return type by just
146//! specifying `return` in the post attribute. For web types that need to be transferred, we simply
147//! wrap them in `transfer` as follows:
148//! ```rust
149//! #[web_rpc::service]
150//! pub trait GameEngine {
151//!     #[post(transfer(canvas))]
152//!     fn send_canvas(
153//!         &self,
154//!         canvas: web_sys::OffscreenCanvas,
155//!     );
156//! }
157//! ```
158//! ### Optional and fallible JavaScript types
159//! Posted JavaScript types can be wrapped in `Option` or `Result` to handle cases where
160//! a value may be absent or an operation may fail. This works for both arguments and
161//! return types, including streaming methods. When the value is `Some` or `Ok`, the
162//! JavaScript object is posted as usual. When the value is `None` or `Err`, no JavaScript
163//! object is sent — only a serialized discriminant travels over the wire.
164//!
165//! For example, a method that optionally returns a JavaScript string:
166//! ```rust
167//! #[web_rpc::service]
168//! pub trait Lookup {
169//!     #[post(return)]
170//!     fn find(&self, key: u32) -> Option<js_sys::JsString>;
171//! }
172//!
173//! struct LookupImpl;
174//! impl Lookup for LookupImpl {
175//!     fn find(&self, key: u32) -> Option<js_sys::JsString> {
176//!         if key == 42 {
177//!             Some(js_sys::JsString::from("found it"))
178//!         } else {
179//!             None
180//!         }
181//!     }
182//! }
183//! ```
184//! The client receives `RequestFuture<Option<js_sys::JsString>>` and can check
185//! whether the server returned a value.
186//!
187//! Similarly, a method that returns a `Result` where both the `Ok` and `Err` types
188//! are JavaScript objects can use `#[post(return)]` — both variants are posted:
189//! ```rust
190//! #[web_rpc::service]
191//! pub trait Parser {
192//!     #[post(return)]
193//!     fn parse(&self, input: String) -> Result<js_sys::JsString, js_sys::Error>;
194//! }
195//!
196//! struct ParserImpl;
197//! impl Parser for ParserImpl {
198//!     fn parse(&self, input: String) -> Result<js_sys::JsString, js_sys::Error> {
199//!         if input.is_empty() {
200//!             Err(js_sys::Error::new("empty input"))
201//!         } else {
202//!             Ok(js_sys::JsString::from(input.as_str()))
203//!         }
204//!     }
205//! }
206//! ```
207//! Arguments can also be optional JavaScript types:
208//! ```rust
209//! #[web_rpc::service]
210//! pub trait Formatter {
211//!     #[post(label)]
212//!     fn format(&self, label: Option<js_sys::JsString>) -> String;
213//! }
214//! ```
215//! All of these wrappers combine with streaming methods too — for example,
216//! `impl Stream<Item = Result<js_sys::JsString, String>>` with `#[post(return)]`
217//! will stream `Result` values where the `Ok` variant carries a posted JavaScript
218//! object and the `Err` variant carries a serialized error.
219//!
220//! ### Borrowed parameters
221//! RPC methods can accept borrowed types such as `&str` and `&[u8]`, which are deserialized
222//! zero-copy on the server side:
223//! ```rust
224//! #[web_rpc::service]
225//! pub trait Greeter {
226//!     fn greet(&self, name: &str, greeting: &str) -> String;
227//!     fn count_bytes(&self, data: &[u8]) -> usize;
228//! }
229//!
230//! struct GreeterServiceImpl;
231//! impl Greeter for GreeterServiceImpl {
232//!     fn greet(&self, name: &str, greeting: &str) -> String {
233//!         format!("{greeting}, {name}!")
234//!     }
235//!     fn count_bytes(&self, data: &[u8]) -> usize {
236//!         data.len()
237//!     }
238//! }
239//! ```
240//! This avoids unnecessary allocations — the server deserializes directly from the received
241//! message bytes without copying into owned `String` or `Vec<u8>` types. On the client side,
242//! borrowed parameters are serialized inline before the method returns, so standard Rust
243//! lifetime rules apply. Note that only types with serde borrowing support (`&str`, `&[u8]`)
244//! benefit from zero-copy deserialization.
245//!
246//! ### Streaming
247//! Methods can return a stream of items using `impl Stream<Item = T>` as the return type.
248//! The macro detects this and generates the appropriate client and server code. On the client
249//! side, the method returns a [`client::StreamReceiver<T>`] which implements
250//! [`futures_core::Stream`]. On the server side, the return type is preserved as-is:
251//! ```rust
252//! #[web_rpc::service]
253//! pub trait DataSource {
254//!     fn stream_data(&self, count: u32) -> impl futures_core::Stream<Item = u32>;
255//! }
256//!
257//! struct DataSourceImpl;
258//! impl DataSource for DataSourceImpl {
259//!     fn stream_data(&self, count: u32) -> impl futures_core::Stream<Item = u32> {
260//!         let (tx, rx) = futures_channel::mpsc::unbounded();
261//!         for i in 0..count {
262//!             let _ = tx.unbounded_send(i);
263//!         }
264//!         rx
265//!     }
266//! }
267//! ```
268//! Dropping the [`client::StreamReceiver`] sends an abort signal to the server, cancelling the
269//! stream. Alternatively, calling [`close`](client::StreamReceiver::close) stops the server
270//! while still allowing buffered items to be drained. Streaming methods can also be async and
271//! can be combined with the `#[post(return)]` attribute for streaming JavaScript types.
272//!
273//! ### Bi-directional RPC
274//! In the original example, we created a server on the first port of the message channel and a
275//! client on the second port. However, it is possible to define both a client and a server on each
276//! side, enabling bi-directional RPC. This is particularly useful if we want to send and receive
277//! messages from a worker without sending it a seperate channel for the bi-directional
278//! communication. Our original example can be extended as follows:
279//! ```rust,no_run
280//! # #[web_rpc::service]
281//! # pub trait Calculator {
282//! #     fn add(&self, left: u32, right: u32) -> u32;
283//! # }
284//! # struct CalculatorServiceImpl;
285//! # impl Calculator for CalculatorServiceImpl {
286//! #     fn add(&self, left: u32, right: u32) -> u32 { left + right }
287//! # }
288//! # async fn example() {
289//! /* create channel */
290//! let channel = web_sys::MessageChannel::new().unwrap();
291//! let (interface1, interface2) = futures_util::future::join(
292//!     web_rpc::Interface::new(channel.port1()),
293//!     web_rpc::Interface::new(channel.port2()),
294//! ).await;
295//! /* create server1 and client1 */
296//! let (client1, server1) = web_rpc::Builder::new(interface1)
297//!     .with_service::<CalculatorService<_>>(CalculatorServiceImpl)
298//!     .with_client::<CalculatorClient>()
299//!     .build();
300//! /* create server2 and client2 */
301//! let (client2, server2) = web_rpc::Builder::new(interface2)
302//!     .with_service::<CalculatorService<_>>(CalculatorServiceImpl)
303//!     .with_client::<CalculatorClient>()
304//!     .build();
305//! # }
306//! ```
307
308use std::{
309    cell::RefCell,
310    marker::PhantomData,
311    pin::Pin,
312    rc::Rc,
313    task::{Context, Poll},
314};
315
316use futures_channel::mpsc;
317use futures_core::{future::LocalBoxFuture, Future};
318use futures_util::{FutureExt, StreamExt};
319use gloo_events::EventListener;
320use js_sys::{ArrayBuffer, Uint8Array};
321use serde::{de::DeserializeOwned, Deserialize, Serialize};
322use wasm_bindgen::JsCast;
323
324#[doc(hidden)]
325pub use bincode;
326#[doc(hidden)]
327pub use futures_channel;
328#[doc(hidden)]
329pub use futures_core;
330#[doc(hidden)]
331pub use futures_util;
332#[doc(hidden)]
333pub use gloo_events;
334#[doc(hidden)]
335pub use js_sys;
336#[doc(hidden)]
337pub use pin_utils;
338#[doc(hidden)]
339pub use serde;
340#[doc(hidden)]
341pub use wasm_bindgen;
342
343pub use web_rpc_macro::service;
344
345pub mod client;
346pub mod interface;
347pub mod port;
348#[doc(hidden)]
349pub mod service;
350
351pub use interface::Interface;
352
353#[doc(hidden)]
354#[derive(Serialize, Deserialize)]
355pub enum MessageHeader {
356    Request(usize),
357    Abort(usize),
358    Response(usize),
359    StreamItem(usize),
360    StreamEnd(usize),
361}
362
363/// This struct allows one to configure the RPC interface prior to creating it.
364/// To get an instance of this struct, call [`Builder<C, S>::new`] with
365/// an [`Interface`].
366pub struct Builder<C, S> {
367    client: PhantomData<C>,
368    service: S,
369    interface: Interface,
370}
371
372impl Builder<(), ()> {
373    /// Create a new builder from an [`Interface`]
374    pub fn new(interface: Interface) -> Self {
375        Self {
376            interface,
377            client: PhantomData::<()>,
378            service: (),
379        }
380    }
381}
382
383impl<C> Builder<C, ()> {
384    /// Configure the RPC interface with a service that implements methods
385    /// that can be called from the other side of the channel. To use this method,
386    /// you need to specify the type `S` which is the service type generated by the
387    /// attribute macro [`macro@service`]. The implementation parameter is then an
388    /// instance of something that implements the trait to which to applied the
389    /// [`macro@service`] macro. For example, if you have a trait `Calculator` to
390    /// which you have applied [`macro@service`], you would use this method as follows:
391    /// ```rust,no_run
392    /// # #[web_rpc::service]
393    /// # pub trait Calculator {
394    /// #     fn add(&self, left: u32, right: u32) -> u32;
395    /// # }
396    /// # struct CalculatorServiceImpl;
397    /// # impl Calculator for CalculatorServiceImpl {
398    /// #     fn add(&self, left: u32, right: u32) -> u32 { left + right }
399    /// # }
400    /// # fn example(some_interface: web_rpc::Interface) {
401    /// let server = web_rpc::Builder::new(some_interface)
402    ///     .with_service::<CalculatorService<_>>(CalculatorServiceImpl)
403    ///     .build();
404    /// # }
405    /// ```
406    pub fn with_service<S: service::Service>(self, implementation: impl Into<S>) -> Builder<C, S> {
407        let service = implementation.into();
408        let Builder {
409            interface, client, ..
410        } = self;
411        Builder {
412            interface,
413            client,
414            service,
415        }
416    }
417}
418
419impl<S> Builder<(), S> {
420    /// Configure the RPC interface with a client that allows you to execute RPCs on the
421    /// server. The builder will automatically instansiate the client for you, you just
422    /// need to provide the type which is generated via the [`macro@service`] attribute
423    /// macro. For example, if you had a trait `Calculator` to which you applied the
424    /// [`macro@service`] attribute macro, the macro would have generated a `CalculatorClient`
425    /// struct which you can use as the `C` in this function.
426    pub fn with_client<C: client::Client>(self) -> Builder<C, S> {
427        let Builder {
428            interface, service, ..
429        } = self;
430        Builder {
431            interface,
432            client: PhantomData::<C>,
433            service,
434        }
435    }
436}
437
438/// `Server` is the server that is returned from the [`Builder::build`] method given
439/// you configured the RPC interface with a service. Note that `Server` implements future and needs
440/// to be polled in order to execute and respond to inbound RPC requests.
441#[must_use = "Server must be polled in order for RPC requests to be executed"]
442pub struct Server {
443    _listener: Rc<EventListener>,
444    task: LocalBoxFuture<'static, ()>,
445}
446
447impl Future for Server {
448    type Output = ();
449
450    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
451        self.task.poll_unpin(cx)
452    }
453}
454
455impl<C> Builder<C, ()>
456where
457    C: client::Client + From<client::Configuration<C::Response>> + 'static,
458    <C as client::Client>::Response: DeserializeOwned,
459{
460    /// Build function for client-only RPC interfaces.
461    pub fn build(self) -> C {
462        let Builder {
463            interface:
464                Interface {
465                    port,
466                    listener,
467                    mut messages_rx,
468                },
469            ..
470        } = self;
471        let client_callback_map: Rc<RefCell<client::CallbackMap<C::Response>>> = Default::default();
472        let client_callback_map_cloned = client_callback_map.clone();
473        let stream_callback_map: Rc<RefCell<client::StreamCallbackMap<C::Response>>> =
474            Default::default();
475        let stream_callback_map_cloned = stream_callback_map.clone();
476        let dispatcher = async move {
477            while let Some(array) = messages_rx.next().await {
478                let header_bytes =
479                    Uint8Array::new(&array.shift().dyn_into::<ArrayBuffer>().unwrap()).to_vec();
480                let header: MessageHeader = bincode::deserialize(&header_bytes).unwrap();
481                match header {
482                    MessageHeader::Response(seq_id) => {
483                        let payload_bytes =
484                            Uint8Array::new(&array.shift().dyn_into::<ArrayBuffer>().unwrap())
485                                .to_vec();
486                        let response: C::Response = bincode::deserialize(&payload_bytes).unwrap();
487                        if let Some(callback_tx) =
488                            client_callback_map_cloned.borrow_mut().remove(&seq_id)
489                        {
490                            let _ = callback_tx.send((response, array));
491                        }
492                    }
493                    MessageHeader::StreamItem(seq_id) => {
494                        let payload_bytes =
495                            Uint8Array::new(&array.shift().dyn_into::<ArrayBuffer>().unwrap())
496                                .to_vec();
497                        let response: C::Response = bincode::deserialize(&payload_bytes).unwrap();
498                        if let Some(tx) = stream_callback_map_cloned.borrow().get(&seq_id) {
499                            let _ = tx.unbounded_send((response, array));
500                        }
501                    }
502                    MessageHeader::StreamEnd(seq_id) => {
503                        stream_callback_map_cloned.borrow_mut().remove(&seq_id);
504                    }
505                    _ => panic!("client received a server message"),
506                }
507            }
508        }
509        .boxed_local()
510        .shared();
511        let port_cloned = port.clone();
512        let abort_sender = move |seq_id: usize| {
513            let header = MessageHeader::Abort(seq_id);
514            let header_bytes = bincode::serialize(&header).unwrap();
515            let buffer = js_sys::Uint8Array::from(&header_bytes[..]).buffer();
516            let post_args = js_sys::Array::of1(&buffer);
517            let transfer_args = js_sys::Array::of1(&buffer);
518            port_cloned
519                .post_message(&post_args, &transfer_args)
520                .unwrap();
521        };
522        C::from((
523            client_callback_map,
524            stream_callback_map,
525            port,
526            Rc::new(listener),
527            dispatcher,
528            Rc::new(abort_sender),
529        ))
530    }
531}
532
533impl<S> Builder<(), S>
534where
535    S: service::Service + 'static,
536    <S as service::Service>::Response: Serialize,
537{
538    /// Build function for server-only RPC interfaces.
539    pub fn build(self) -> Server {
540        let Builder {
541            service,
542            interface:
543                Interface {
544                    port,
545                    listener,
546                    mut messages_rx,
547                },
548            ..
549        } = self;
550        let (server_requests_tx, server_requests_rx) = mpsc::unbounded();
551        let (abort_requests_tx, abort_requests_rx) = mpsc::unbounded();
552        let dispatcher = async move {
553            while let Some(array) = messages_rx.next().await {
554                let header_bytes =
555                    Uint8Array::new(&array.shift().dyn_into::<ArrayBuffer>().unwrap()).to_vec();
556                let header: MessageHeader = bincode::deserialize(&header_bytes).unwrap();
557                match header {
558                    MessageHeader::Request(seq_id) => {
559                        let payload =
560                            Uint8Array::new(&array.shift().dyn_into::<ArrayBuffer>().unwrap())
561                                .to_vec();
562                        server_requests_tx
563                            .unbounded_send((seq_id, payload, array))
564                            .unwrap();
565                    }
566                    MessageHeader::Abort(seq_id) => {
567                        abort_requests_tx.unbounded_send(seq_id).unwrap();
568                    }
569                    _ => panic!("server received a client message"),
570                }
571            }
572        }
573        .boxed_local()
574        .shared();
575        Server {
576            _listener: Rc::new(listener),
577            task: service::task::<S>(
578                service,
579                port,
580                dispatcher,
581                server_requests_rx,
582                abort_requests_rx,
583            )
584            .boxed_local(),
585        }
586    }
587}
588
589impl<C, S> Builder<C, S>
590where
591    C: client::Client + From<client::Configuration<C::Response>> + 'static,
592    S: service::Service + 'static,
593    <S as service::Service>::Response: Serialize,
594    <C as client::Client>::Response: DeserializeOwned,
595{
596    /// Build function for client-server RPC interfaces.
597    pub fn build(self) -> (C, Server) {
598        let Builder {
599            service: server,
600            interface:
601                Interface {
602                    port,
603                    listener,
604                    mut messages_rx,
605                },
606            ..
607        } = self;
608        let client_callback_map: Rc<RefCell<client::CallbackMap<C::Response>>> = Default::default();
609        let stream_callback_map: Rc<RefCell<client::StreamCallbackMap<C::Response>>> =
610            Default::default();
611        let (server_requests_tx, server_requests_rx) = mpsc::unbounded();
612        let (abort_requests_tx, abort_requests_rx) = mpsc::unbounded();
613        let client_callback_map_cloned = client_callback_map.clone();
614        let stream_callback_map_cloned = stream_callback_map.clone();
615        let dispatcher = async move {
616            while let Some(array) = messages_rx.next().await {
617                let header_bytes =
618                    Uint8Array::new(&array.shift().dyn_into::<ArrayBuffer>().unwrap()).to_vec();
619                let header: MessageHeader = bincode::deserialize(&header_bytes).unwrap();
620                match header {
621                    MessageHeader::Response(seq_id) => {
622                        let payload_bytes =
623                            Uint8Array::new(&array.shift().dyn_into::<ArrayBuffer>().unwrap())
624                                .to_vec();
625                        let response: C::Response = bincode::deserialize(&payload_bytes).unwrap();
626                        if let Some(callback_tx) =
627                            client_callback_map_cloned.borrow_mut().remove(&seq_id)
628                        {
629                            let _ = callback_tx.send((response, array));
630                        }
631                    }
632                    MessageHeader::StreamItem(seq_id) => {
633                        let payload_bytes =
634                            Uint8Array::new(&array.shift().dyn_into::<ArrayBuffer>().unwrap())
635                                .to_vec();
636                        let response: C::Response = bincode::deserialize(&payload_bytes).unwrap();
637                        if let Some(tx) = stream_callback_map_cloned.borrow().get(&seq_id) {
638                            let _ = tx.unbounded_send((response, array));
639                        }
640                    }
641                    MessageHeader::StreamEnd(seq_id) => {
642                        stream_callback_map_cloned.borrow_mut().remove(&seq_id);
643                    }
644                    MessageHeader::Request(seq_id) => {
645                        let payload =
646                            Uint8Array::new(&array.shift().dyn_into::<ArrayBuffer>().unwrap())
647                                .to_vec();
648                        server_requests_tx
649                            .unbounded_send((seq_id, payload, array))
650                            .unwrap();
651                    }
652                    MessageHeader::Abort(seq_id) => {
653                        abort_requests_tx.unbounded_send(seq_id).unwrap();
654                    }
655                }
656            }
657        }
658        .boxed_local()
659        .shared();
660        let port_cloned = port.clone();
661        let abort_sender = move |seq_id: usize| {
662            let header = MessageHeader::Abort(seq_id);
663            let header_bytes = bincode::serialize(&header).unwrap();
664            let buffer = js_sys::Uint8Array::from(&header_bytes[..]).buffer();
665            let post_args = js_sys::Array::of1(&buffer);
666            let transfer_args = js_sys::Array::of1(&buffer);
667            port_cloned
668                .post_message(&post_args, &transfer_args)
669                .unwrap();
670        };
671        let listener = Rc::new(listener);
672        let client = C::from((
673            client_callback_map,
674            stream_callback_map,
675            port.clone(),
676            listener.clone(),
677            dispatcher.clone(),
678            Rc::new(abort_sender),
679        ));
680        let server = Server {
681            _listener: listener,
682            task: service::task::<S>(
683                server,
684                port,
685                dispatcher,
686                server_requests_rx,
687                abort_requests_rx,
688            )
689            .boxed_local(),
690        };
691        (client, server)
692    }
693}