web_rpc/interface.rs
1use futures_channel::{mpsc, oneshot};
2use futures_util::future;
3use wasm_bindgen::{JsCast, JsValue};
4
5/// An interface represents a [`crate::port::Port`] that has been fully initialised and has
6/// verified that the other end of the channel is ready to receive messages.
7pub struct Interface {
8 pub(crate) port: crate::port::Port,
9 pub(crate) listener: gloo_events::EventListener,
10 pub(crate) messages_rx: mpsc::UnboundedReceiver<js_sys::Array>,
11}
12
13impl Interface {
14 /// Create a new interface from anything that implements `Into<Port>`, for example, a
15 /// [`web_sys::MessagePort`], a [`web_sys::Worker`], or a
16 /// [`web_sys::DedicatedWorkerGlobalScope`]. This function is async and resolves to the new
17 /// interface instance once the other side of the channel is ready.
18 pub async fn new(port: impl Into<crate::port::Port>) -> Self {
19 let port = port.into();
20 let (dispatcher_tx, dispatcher_rx) = mpsc::unbounded();
21 let (ready_tx, ready_rx) = oneshot::channel();
22 let mut ready_tx = Option::from(ready_tx);
23 let listener =
24 gloo_events::EventListener::new(port.event_target(), "message", move |event| {
25 let message = event.unchecked_ref::<web_sys::MessageEvent>().data();
26 match message.dyn_into::<js_sys::Array>() {
27 /* default path, enqueue the message for deserialization by the dispatcher */
28 Ok(array) => {
29 let _ = dispatcher_tx.unbounded_send(array);
30 }
31 /* handshake path */
32 Err(_) => {
33 if let Some(ready_tx) = ready_tx.take() {
34 let _ = ready_tx.send(());
35 }
36 }
37 }
38 });
39 /* needed for MessagePort */
40 port.start();
41 /* poll other end of the channel */
42 let port_cloned = port.clone();
43 let poll = async move {
44 loop {
45 port_cloned
46 .post_message(&JsValue::NULL, &JsValue::UNDEFINED)
47 .unwrap();
48 gloo_timers::future::TimeoutFuture::new(10).await;
49 }
50 };
51 pin_utils::pin_mut!(poll);
52 future::select(ready_rx, poll).await;
53 /* at this point we know the other end's listener is available, but we may
54 need to send one last message to indicate that we are available */
55 port.post_message(&JsValue::NULL, &JsValue::UNDEFINED)
56 .unwrap();
57 /* return the interface */
58 Self {
59 messages_rx: dispatcher_rx,
60 listener,
61 port,
62 }
63 }
64}