web_rpc/
interface.rs

1use futures_channel::{mpsc, oneshot};
2use futures_util::future;
3use wasm_bindgen::{JsCast, JsValue};
4
5/// An interface represents a [`crate::port::Port`] that has been fully initialised and has
6/// verified that the other end of the channel is ready to receive messages.
7pub struct Interface {
8    pub(crate) port: crate::port::Port,
9    pub(crate) listener: gloo_events::EventListener,
10    pub(crate) messages_rx: mpsc::UnboundedReceiver<js_sys::Array>,
11}
12
13impl Interface {
14    /// Create a new interface from anything that implements `Into<Port>`, for example, a
15    /// [`web_sys::MessagePort`], a [`web_sys::Worker`], or a
16    /// [`web_sys::DedicatedWorkerGlobalScope`]. This function is async and resolves to the new
17    /// interface instance once the other side of the channel is ready.
18    pub async fn new(port: impl Into<crate::port::Port>) -> Self {
19        let port = port.into();
20        let (dispatcher_tx, dispatcher_rx) = mpsc::unbounded();
21        let (ready_tx, ready_rx) = oneshot::channel();
22        let mut ready_tx = Option::from(ready_tx);
23        let listener =
24            gloo_events::EventListener::new(port.event_target(), "message", move |event| {
25                let message = event.unchecked_ref::<web_sys::MessageEvent>().data();
26                match message.dyn_into::<js_sys::Array>() {
27                    /* default path, enqueue the message for deserialization by the dispatcher */
28                    Ok(array) => {
29                        let _ = dispatcher_tx.unbounded_send(array);
30                    }
31                    /* handshake path */
32                    Err(_) => {
33                        if let Some(ready_tx) = ready_tx.take() {
34                            let _ = ready_tx.send(());
35                        }
36                    }
37                }
38            });
39        /* needed for MessagePort */
40        port.start();
41        /* poll other end of the channel */
42        let port_cloned = port.clone();
43        let poll = async move {
44            loop {
45                port_cloned
46                    .post_message(&JsValue::NULL, &JsValue::UNDEFINED)
47                    .unwrap();
48                gloo_timers::future::TimeoutFuture::new(10).await;
49            }
50        };
51        pin_utils::pin_mut!(poll);
52        future::select(ready_rx, poll).await;
53        /* at this point we know the other end's listener is available, but we may
54        need to send one last message to indicate that we are available */
55        port.post_message(&JsValue::NULL, &JsValue::UNDEFINED)
56            .unwrap();
57        /* return the interface */
58        Self {
59            messages_rx: dispatcher_rx,
60            listener,
61            port,
62        }
63    }
64}