web_rpc/lib.rs
1//! The `web-rpc` create is a library for performing RPCs (remote proceedure calls) between
2//! browsing contexts, web workers, and channels. It allows you to define an RPC using a trait
3//! similar to Google's [tarpc](https://github.com/google/tarpc) and will transparently
4//! handle the serialization and deserialization of the arguments. Moreover, it can post
5//! anything that implements [`AsRef<JsValue>`](https://docs.rs/wasm-bindgen/latest/wasm_bindgen/struct.JsValue.html) and also supports transferring ownership.
6//!
7//! ## Quick start
8//! To get started define a trait for your RPC service as follows. Annnotate this trait with the
9//! `service` procedural macro that is exported by this crate:
10//! ```rust
11//! #[web_rpc::service]
12//! pub trait Calculator {
13//! fn add(&self, left: u32, right: u32) -> u32;
14//! }
15//! ```
16//! This macro will generate the structs `CalculatorClient`, `CalculatorService`, and a new trait
17//! `Calculator` that you can use to implement the service as follows:
18//! ```rust
19//! # #[web_rpc::service]
20//! # pub trait Calculator {
21//! # fn add(&self, left: u32, right: u32) -> u32;
22//! # }
23//! struct CalculatorServiceImpl;
24//!
25//! impl Calculator for CalculatorServiceImpl {
26//! fn add(&self, left: u32, right: u32) -> u32 {
27//! left + right
28//! }
29//! }
30//! ```
31//! Note that the `&self` receiver is required in the trait definition. Although not
32//! used in this example, this is useful when we want the RPC to modify some state (via interior
33//! mutability). Now that we have defined our RPC, let's create a client and server for it! In this
34//! example, we will use [`MessageChannel`](https://docs.rs/web-sys/latest/web_sys/struct.MessageChannel.html)
35//! since it is easy to construct and test, however, a more common case would be to construct the
36//! channel from a [`Worker`](https://docs.rs/web-sys/latest/web_sys/struct.Worker.html) or a
37//! [`DedicatedWorkerGlobalScope`](https://docs.rs/web-sys/latest/web_sys/struct.DedicatedWorkerGlobalScope.html).
38//! Let's start by defining the server:
39//! ```rust,no_run
40//! # #[web_rpc::service]
41//! # pub trait Calculator {
42//! # fn add(&self, left: u32, right: u32) -> u32;
43//! # }
44//! # struct CalculatorServiceImpl;
45//! # impl Calculator for CalculatorServiceImpl {
46//! # fn add(&self, left: u32, right: u32) -> u32 { left + right }
47//! # }
48//! # async fn example() {
49//! // create a MessageChannel
50//! let channel = web_sys::MessageChannel::new().unwrap();
51//! // Create two interfaces from the ports
52//! let (server_interface, client_interface) = futures_util::future::join(
53//! web_rpc::Interface::new(channel.port1()),
54//! web_rpc::Interface::new(channel.port2()),
55//! ).await;
56//! // create a server with the first interface
57//! let server = web_rpc::Builder::new(server_interface)
58//! .with_service::<CalculatorService<_>>(CalculatorServiceImpl)
59//! .build();
60//! // spawn the server
61//! wasm_bindgen_futures::spawn_local(server);
62//! # }
63//! ```
64//! [`Interface::new`] is async since there is no way to synchronously check whether a channel or
65//! a worker is ready to receive messages. To workaround this, temporary listeners are attached to
66//! determine when a channel is ready for communication. The server returned by the build method is
67//! a future that can be added to the browser's event loop using
68//! [`wasm_bindgen_futures::spawn_local`], however, this will run the server indefinitely. For more
69//! control, consider wrapping the server with [`futures_util::FutureExt::remote_handle`] before
70//! spawning it, which will shutdown the server once the handle has been dropped. Moving onto the
71//! client:
72//! ```rust,no_run
73//! # #[web_rpc::service]
74//! # pub trait Calculator {
75//! # fn add(&self, left: u32, right: u32) -> u32;
76//! # }
77//! # async fn example(client_interface: web_rpc::Interface) {
78//! // create a client using the second interface
79//! let client = web_rpc::Builder::new(client_interface)
80//! .with_client::<CalculatorClient>()
81//! .build();
82//! /* call `add` */
83//! assert_eq!(client.add(41, 1).await, 42);
84//! # }
85//! ```
86//! That is it! Underneath the hood, the client will serialize its arguments using bincode and
87//! transfer the bytes to server. The server will deserialize those arguments and run
88//! `<CalculatorServiceImpl as Calculator>::add` before returning the result to the client. Note
89//! that we are only awaiting the response of the call to `add`, the request itself is sent
90//! synchronously before we await anything.
91//!
92//! ## Advanced examples
93//! Now that we have the basic idea of how define an RPC trait and set up a server and client, let's
94//! dive into some of the more advanced features of this library!
95//!
96//! ### Synchronous and asynchronous RPC methods
97//! Server methods can be asynchronous! That is, you can define the following RPC trait and service
98//! implementation:
99//! ```rust,no_run
100//! # use std::time::Duration;
101//! #[web_rpc::service]
102//! pub trait Sleep {
103//! async fn sleep(&self, interval: Duration);
104//! }
105//!
106//! struct SleepServiceImpl;
107//! impl Sleep for SleepServiceImpl {
108//! async fn sleep(&self, interval: Duration) {
109//! gloo_timers::future::sleep(interval).await;
110//! }
111//! }
112//! ```
113//! Asynchronous RPC methods are run concurrently on the server and also support cancellation if the
114//! future on the client side is dropped. However, such a future is only returned from a client
115//! method if the RPC returns a value. Otherwise the RPC is considered a notification.
116//!
117//! ### Notifications
118//! Notifications are RPCs that do not return anything. On the client side, the method is completely
119//! synchronous and also returns nothing. This setup is useful if you need to communicate with
120//! another part of your application but cannot yield to the event loop.
121//!
122//! The implication of this, however, is that even if the server method is asynchronous, we are
123//! unable to cancel it from the client side since we do not have a future that can be dropped.
124//!
125//! ### Posting and transferring Javascript types
126//! In the example above, we discussed how the client serializes its arguments before sending them
127//! to the server. This approach is convenient, but how do send web types such as a
128//! `WebAssembly.Module` or an `OffscreenCanvas` that have no serializable representation? Well, we
129//! are in luck since this happens to be one of the key features of this crate. Consider the
130//! following RPC trait:
131//! ```rust
132//! #[web_rpc::service]
133//! pub trait Concat {
134//! #[post(left, right, return)]
135//! fn concat_with_space(
136//! &self,
137//! left: js_sys::JsString,
138//! right: js_sys::JsString
139//! ) -> js_sys::JsString;
140//! }
141//! ```
142//! All we have done is added the `post` attribute to the method and listed the arguments that we
143//! would like to be posted to the other side. Under the hood, the implementation of the client will
144//! then skip these arguments during serialization and just append them after the serialized message
145//! to the array that will be posted. As shown above, this also works for the return type by just
146//! specifying `return` in the post attribute. For web types that need to be transferred, we simply
147//! wrap them in `transfer` as follows:
148//! ```rust
149//! #[web_rpc::service]
150//! pub trait GameEngine {
151//! #[post(transfer(canvas))]
152//! fn send_canvas(
153//! &self,
154//! canvas: web_sys::OffscreenCanvas,
155//! );
156//! }
157//! ```
158//! ### Optional and fallible JavaScript types
159//! Posted JavaScript types can be wrapped in `Option` or `Result` to handle cases where
160//! a value may be absent or an operation may fail. This works for both arguments and
161//! return types, including streaming methods. When the value is `Some` or `Ok`, the
162//! JavaScript object is posted as usual. When the value is `None` or `Err`, no JavaScript
163//! object is sent — only a serialized discriminant travels over the wire.
164//!
165//! For example, a method that optionally returns a JavaScript string:
166//! ```rust
167//! #[web_rpc::service]
168//! pub trait Lookup {
169//! #[post(return)]
170//! fn find(&self, key: u32) -> Option<js_sys::JsString>;
171//! }
172//!
173//! struct LookupImpl;
174//! impl Lookup for LookupImpl {
175//! fn find(&self, key: u32) -> Option<js_sys::JsString> {
176//! if key == 42 {
177//! Some(js_sys::JsString::from("found it"))
178//! } else {
179//! None
180//! }
181//! }
182//! }
183//! ```
184//! The client receives `RequestFuture<Option<js_sys::JsString>>` and can check
185//! whether the server returned a value.
186//!
187//! Similarly, a method that returns a `Result` where both the `Ok` and `Err` types
188//! are JavaScript objects can use `#[post(return)]` — both variants are posted:
189//! ```rust
190//! #[web_rpc::service]
191//! pub trait Parser {
192//! #[post(return)]
193//! fn parse(&self, input: String) -> Result<js_sys::JsString, js_sys::Error>;
194//! }
195//!
196//! struct ParserImpl;
197//! impl Parser for ParserImpl {
198//! fn parse(&self, input: String) -> Result<js_sys::JsString, js_sys::Error> {
199//! if input.is_empty() {
200//! Err(js_sys::Error::new("empty input"))
201//! } else {
202//! Ok(js_sys::JsString::from(input.as_str()))
203//! }
204//! }
205//! }
206//! ```
207//! Arguments can also be optional JavaScript types:
208//! ```rust
209//! #[web_rpc::service]
210//! pub trait Formatter {
211//! #[post(label)]
212//! fn format(&self, label: Option<js_sys::JsString>) -> String;
213//! }
214//! ```
215//! All of these wrappers combine with streaming methods too — for example,
216//! `impl Stream<Item = Result<js_sys::JsString, String>>` with `#[post(return)]`
217//! will stream `Result` values where the `Ok` variant carries a posted JavaScript
218//! object and the `Err` variant carries a serialized error.
219//!
220//! ### Borrowed parameters
221//! RPC methods can accept borrowed types such as `&str` and `&[u8]`, which are deserialized
222//! zero-copy on the server side:
223//! ```rust
224//! #[web_rpc::service]
225//! pub trait Greeter {
226//! fn greet(&self, name: &str, greeting: &str) -> String;
227//! fn count_bytes(&self, data: &[u8]) -> usize;
228//! }
229//!
230//! struct GreeterServiceImpl;
231//! impl Greeter for GreeterServiceImpl {
232//! fn greet(&self, name: &str, greeting: &str) -> String {
233//! format!("{greeting}, {name}!")
234//! }
235//! fn count_bytes(&self, data: &[u8]) -> usize {
236//! data.len()
237//! }
238//! }
239//! ```
240//! This avoids unnecessary allocations — the server deserializes directly from the received
241//! message bytes without copying into owned `String` or `Vec<u8>` types. On the client side,
242//! borrowed parameters are serialized inline before the method returns, so standard Rust
243//! lifetime rules apply. Note that only types with serde borrowing support (`&str`, `&[u8]`)
244//! benefit from zero-copy deserialization.
245//!
246//! ### Streaming
247//! Methods can return a stream of items using `impl Stream<Item = T>` as the return type.
248//! The macro detects this and generates the appropriate client and server code. On the client
249//! side, the method returns a [`client::StreamReceiver<T>`] which implements
250//! [`futures_core::Stream`]. On the server side, the return type is preserved as-is:
251//! ```rust
252//! #[web_rpc::service]
253//! pub trait DataSource {
254//! fn stream_data(&self, count: u32) -> impl futures_core::Stream<Item = u32>;
255//! }
256//!
257//! struct DataSourceImpl;
258//! impl DataSource for DataSourceImpl {
259//! fn stream_data(&self, count: u32) -> impl futures_core::Stream<Item = u32> {
260//! let (tx, rx) = futures_channel::mpsc::unbounded();
261//! for i in 0..count {
262//! let _ = tx.unbounded_send(i);
263//! }
264//! rx
265//! }
266//! }
267//! ```
268//! Dropping the [`client::StreamReceiver`] sends an abort signal to the server, cancelling the
269//! stream. Alternatively, calling [`close`](client::StreamReceiver::close) stops the server
270//! while still allowing buffered items to be drained. Streaming methods can also be async and
271//! can be combined with the `#[post(return)]` attribute for streaming JavaScript types.
272//!
273//! ### Bi-directional RPC
274//! In the original example, we created a server on the first port of the message channel and a
275//! client on the second port. However, it is possible to define both a client and a server on each
276//! side, enabling bi-directional RPC. This is particularly useful if we want to send and receive
277//! messages from a worker without sending it a seperate channel for the bi-directional
278//! communication. Our original example can be extended as follows:
279//! ```rust,no_run
280//! # #[web_rpc::service]
281//! # pub trait Calculator {
282//! # fn add(&self, left: u32, right: u32) -> u32;
283//! # }
284//! # struct CalculatorServiceImpl;
285//! # impl Calculator for CalculatorServiceImpl {
286//! # fn add(&self, left: u32, right: u32) -> u32 { left + right }
287//! # }
288//! # async fn example() {
289//! /* create channel */
290//! let channel = web_sys::MessageChannel::new().unwrap();
291//! let (interface1, interface2) = futures_util::future::join(
292//! web_rpc::Interface::new(channel.port1()),
293//! web_rpc::Interface::new(channel.port2()),
294//! ).await;
295//! /* create server1 and client1 */
296//! let (client1, server1) = web_rpc::Builder::new(interface1)
297//! .with_service::<CalculatorService<_>>(CalculatorServiceImpl)
298//! .with_client::<CalculatorClient>()
299//! .build();
300//! /* create server2 and client2 */
301//! let (client2, server2) = web_rpc::Builder::new(interface2)
302//! .with_service::<CalculatorService<_>>(CalculatorServiceImpl)
303//! .with_client::<CalculatorClient>()
304//! .build();
305//! # }
306//! ```
307
308use std::{
309 cell::RefCell,
310 marker::PhantomData,
311 pin::Pin,
312 rc::Rc,
313 task::{Context, Poll},
314};
315
316use futures_channel::mpsc;
317use futures_core::{future::LocalBoxFuture, Future};
318use futures_util::{FutureExt, StreamExt};
319use gloo_events::EventListener;
320use js_sys::{ArrayBuffer, Uint8Array};
321use serde::{de::DeserializeOwned, Deserialize, Serialize};
322use wasm_bindgen::JsCast;
323
324#[doc(hidden)]
325pub use bincode;
326#[doc(hidden)]
327pub use futures_channel;
328#[doc(hidden)]
329pub use futures_core;
330#[doc(hidden)]
331pub use futures_util;
332#[doc(hidden)]
333pub use gloo_events;
334#[doc(hidden)]
335pub use js_sys;
336#[doc(hidden)]
337pub use pin_utils;
338#[doc(hidden)]
339pub use serde;
340#[doc(hidden)]
341pub use wasm_bindgen;
342
343pub use web_rpc_macro::service;
344
345pub mod client;
346pub mod interface;
347pub mod port;
348#[doc(hidden)]
349pub mod service;
350
351pub use interface::Interface;
352
353#[doc(hidden)]
354#[derive(Serialize, Deserialize)]
355pub enum MessageHeader {
356 Request(usize),
357 Abort(usize),
358 Response(usize),
359 StreamItem(usize),
360 StreamEnd(usize),
361}
362
363/// This struct allows one to configure the RPC interface prior to creating it.
364/// To get an instance of this struct, call [`Builder<C, S>::new`] with
365/// an [`Interface`].
366pub struct Builder<C, S> {
367 client: PhantomData<C>,
368 service: S,
369 interface: Interface,
370}
371
372impl Builder<(), ()> {
373 /// Create a new builder from an [`Interface`]
374 pub fn new(interface: Interface) -> Self {
375 Self {
376 interface,
377 client: PhantomData::<()>,
378 service: (),
379 }
380 }
381}
382
383impl<C> Builder<C, ()> {
384 /// Configure the RPC interface with a service that implements methods
385 /// that can be called from the other side of the channel. To use this method,
386 /// you need to specify the type `S` which is the service type generated by the
387 /// attribute macro [`macro@service`]. The implementation parameter is then an
388 /// instance of something that implements the trait to which to applied the
389 /// [`macro@service`] macro. For example, if you have a trait `Calculator` to
390 /// which you have applied [`macro@service`], you would use this method as follows:
391 /// ```rust,no_run
392 /// # #[web_rpc::service]
393 /// # pub trait Calculator {
394 /// # fn add(&self, left: u32, right: u32) -> u32;
395 /// # }
396 /// # struct CalculatorServiceImpl;
397 /// # impl Calculator for CalculatorServiceImpl {
398 /// # fn add(&self, left: u32, right: u32) -> u32 { left + right }
399 /// # }
400 /// # fn example(some_interface: web_rpc::Interface) {
401 /// let server = web_rpc::Builder::new(some_interface)
402 /// .with_service::<CalculatorService<_>>(CalculatorServiceImpl)
403 /// .build();
404 /// # }
405 /// ```
406 pub fn with_service<S: service::Service>(self, implementation: impl Into<S>) -> Builder<C, S> {
407 let service = implementation.into();
408 let Builder {
409 interface, client, ..
410 } = self;
411 Builder {
412 interface,
413 client,
414 service,
415 }
416 }
417}
418
419impl<S> Builder<(), S> {
420 /// Configure the RPC interface with a client that allows you to execute RPCs on the
421 /// server. The builder will automatically instansiate the client for you, you just
422 /// need to provide the type which is generated via the [`macro@service`] attribute
423 /// macro. For example, if you had a trait `Calculator` to which you applied the
424 /// [`macro@service`] attribute macro, the macro would have generated a `CalculatorClient`
425 /// struct which you can use as the `C` in this function.
426 pub fn with_client<C: client::Client>(self) -> Builder<C, S> {
427 let Builder {
428 interface, service, ..
429 } = self;
430 Builder {
431 interface,
432 client: PhantomData::<C>,
433 service,
434 }
435 }
436}
437
438/// `Server` is the server that is returned from the [`Builder::build`] method given
439/// you configured the RPC interface with a service. Note that `Server` implements future and needs
440/// to be polled in order to execute and respond to inbound RPC requests.
441#[must_use = "Server must be polled in order for RPC requests to be executed"]
442pub struct Server {
443 _listener: Rc<EventListener>,
444 task: LocalBoxFuture<'static, ()>,
445}
446
447impl Future for Server {
448 type Output = ();
449
450 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
451 self.task.poll_unpin(cx)
452 }
453}
454
455impl<C> Builder<C, ()>
456where
457 C: client::Client + From<client::Configuration<C::Response>> + 'static,
458 <C as client::Client>::Response: DeserializeOwned,
459{
460 /// Build function for client-only RPC interfaces.
461 pub fn build(self) -> C {
462 let Builder {
463 interface:
464 Interface {
465 port,
466 listener,
467 mut messages_rx,
468 },
469 ..
470 } = self;
471 let client_callback_map: Rc<RefCell<client::CallbackMap<C::Response>>> = Default::default();
472 let client_callback_map_cloned = client_callback_map.clone();
473 let stream_callback_map: Rc<RefCell<client::StreamCallbackMap<C::Response>>> =
474 Default::default();
475 let stream_callback_map_cloned = stream_callback_map.clone();
476 let dispatcher = async move {
477 while let Some(array) = messages_rx.next().await {
478 let header_bytes =
479 Uint8Array::new(&array.shift().dyn_into::<ArrayBuffer>().unwrap()).to_vec();
480 let header: MessageHeader = bincode::deserialize(&header_bytes).unwrap();
481 match header {
482 MessageHeader::Response(seq_id) => {
483 let payload_bytes =
484 Uint8Array::new(&array.shift().dyn_into::<ArrayBuffer>().unwrap())
485 .to_vec();
486 let response: C::Response = bincode::deserialize(&payload_bytes).unwrap();
487 if let Some(callback_tx) =
488 client_callback_map_cloned.borrow_mut().remove(&seq_id)
489 {
490 let _ = callback_tx.send((response, array));
491 }
492 }
493 MessageHeader::StreamItem(seq_id) => {
494 let payload_bytes =
495 Uint8Array::new(&array.shift().dyn_into::<ArrayBuffer>().unwrap())
496 .to_vec();
497 let response: C::Response = bincode::deserialize(&payload_bytes).unwrap();
498 if let Some(tx) = stream_callback_map_cloned.borrow().get(&seq_id) {
499 let _ = tx.unbounded_send((response, array));
500 }
501 }
502 MessageHeader::StreamEnd(seq_id) => {
503 stream_callback_map_cloned.borrow_mut().remove(&seq_id);
504 }
505 _ => panic!("client received a server message"),
506 }
507 }
508 }
509 .boxed_local()
510 .shared();
511 let port_cloned = port.clone();
512 let abort_sender = move |seq_id: usize| {
513 let header = MessageHeader::Abort(seq_id);
514 let header_bytes = bincode::serialize(&header).unwrap();
515 let buffer = js_sys::Uint8Array::from(&header_bytes[..]).buffer();
516 let post_args = js_sys::Array::of1(&buffer);
517 let transfer_args = js_sys::Array::of1(&buffer);
518 port_cloned
519 .post_message(&post_args, &transfer_args)
520 .unwrap();
521 };
522 C::from((
523 client_callback_map,
524 stream_callback_map,
525 port,
526 Rc::new(listener),
527 dispatcher,
528 Rc::new(abort_sender),
529 ))
530 }
531}
532
533impl<S> Builder<(), S>
534where
535 S: service::Service + 'static,
536 <S as service::Service>::Response: Serialize,
537{
538 /// Build function for server-only RPC interfaces.
539 pub fn build(self) -> Server {
540 let Builder {
541 service,
542 interface:
543 Interface {
544 port,
545 listener,
546 mut messages_rx,
547 },
548 ..
549 } = self;
550 let (server_requests_tx, server_requests_rx) = mpsc::unbounded();
551 let (abort_requests_tx, abort_requests_rx) = mpsc::unbounded();
552 let dispatcher = async move {
553 while let Some(array) = messages_rx.next().await {
554 let header_bytes =
555 Uint8Array::new(&array.shift().dyn_into::<ArrayBuffer>().unwrap()).to_vec();
556 let header: MessageHeader = bincode::deserialize(&header_bytes).unwrap();
557 match header {
558 MessageHeader::Request(seq_id) => {
559 let payload =
560 Uint8Array::new(&array.shift().dyn_into::<ArrayBuffer>().unwrap())
561 .to_vec();
562 server_requests_tx
563 .unbounded_send((seq_id, payload, array))
564 .unwrap();
565 }
566 MessageHeader::Abort(seq_id) => {
567 abort_requests_tx.unbounded_send(seq_id).unwrap();
568 }
569 _ => panic!("server received a client message"),
570 }
571 }
572 }
573 .boxed_local()
574 .shared();
575 Server {
576 _listener: Rc::new(listener),
577 task: service::task::<S>(
578 service,
579 port,
580 dispatcher,
581 server_requests_rx,
582 abort_requests_rx,
583 )
584 .boxed_local(),
585 }
586 }
587}
588
589impl<C, S> Builder<C, S>
590where
591 C: client::Client + From<client::Configuration<C::Response>> + 'static,
592 S: service::Service + 'static,
593 <S as service::Service>::Response: Serialize,
594 <C as client::Client>::Response: DeserializeOwned,
595{
596 /// Build function for client-server RPC interfaces.
597 pub fn build(self) -> (C, Server) {
598 let Builder {
599 service: server,
600 interface:
601 Interface {
602 port,
603 listener,
604 mut messages_rx,
605 },
606 ..
607 } = self;
608 let client_callback_map: Rc<RefCell<client::CallbackMap<C::Response>>> = Default::default();
609 let stream_callback_map: Rc<RefCell<client::StreamCallbackMap<C::Response>>> =
610 Default::default();
611 let (server_requests_tx, server_requests_rx) = mpsc::unbounded();
612 let (abort_requests_tx, abort_requests_rx) = mpsc::unbounded();
613 let client_callback_map_cloned = client_callback_map.clone();
614 let stream_callback_map_cloned = stream_callback_map.clone();
615 let dispatcher = async move {
616 while let Some(array) = messages_rx.next().await {
617 let header_bytes =
618 Uint8Array::new(&array.shift().dyn_into::<ArrayBuffer>().unwrap()).to_vec();
619 let header: MessageHeader = bincode::deserialize(&header_bytes).unwrap();
620 match header {
621 MessageHeader::Response(seq_id) => {
622 let payload_bytes =
623 Uint8Array::new(&array.shift().dyn_into::<ArrayBuffer>().unwrap())
624 .to_vec();
625 let response: C::Response = bincode::deserialize(&payload_bytes).unwrap();
626 if let Some(callback_tx) =
627 client_callback_map_cloned.borrow_mut().remove(&seq_id)
628 {
629 let _ = callback_tx.send((response, array));
630 }
631 }
632 MessageHeader::StreamItem(seq_id) => {
633 let payload_bytes =
634 Uint8Array::new(&array.shift().dyn_into::<ArrayBuffer>().unwrap())
635 .to_vec();
636 let response: C::Response = bincode::deserialize(&payload_bytes).unwrap();
637 if let Some(tx) = stream_callback_map_cloned.borrow().get(&seq_id) {
638 let _ = tx.unbounded_send((response, array));
639 }
640 }
641 MessageHeader::StreamEnd(seq_id) => {
642 stream_callback_map_cloned.borrow_mut().remove(&seq_id);
643 }
644 MessageHeader::Request(seq_id) => {
645 let payload =
646 Uint8Array::new(&array.shift().dyn_into::<ArrayBuffer>().unwrap())
647 .to_vec();
648 server_requests_tx
649 .unbounded_send((seq_id, payload, array))
650 .unwrap();
651 }
652 MessageHeader::Abort(seq_id) => {
653 abort_requests_tx.unbounded_send(seq_id).unwrap();
654 }
655 }
656 }
657 }
658 .boxed_local()
659 .shared();
660 let port_cloned = port.clone();
661 let abort_sender = move |seq_id: usize| {
662 let header = MessageHeader::Abort(seq_id);
663 let header_bytes = bincode::serialize(&header).unwrap();
664 let buffer = js_sys::Uint8Array::from(&header_bytes[..]).buffer();
665 let post_args = js_sys::Array::of1(&buffer);
666 let transfer_args = js_sys::Array::of1(&buffer);
667 port_cloned
668 .post_message(&post_args, &transfer_args)
669 .unwrap();
670 };
671 let listener = Rc::new(listener);
672 let client = C::from((
673 client_callback_map,
674 stream_callback_map,
675 port.clone(),
676 listener.clone(),
677 dispatcher.clone(),
678 Rc::new(abort_sender),
679 ));
680 let server = Server {
681 _listener: listener,
682 task: service::task::<S>(
683 server,
684 port,
685 dispatcher,
686 server_requests_rx,
687 abort_requests_rx,
688 )
689 .boxed_local(),
690 };
691 (client, server)
692 }
693}