web_rpc/
interface.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
use futures_channel::{mpsc, oneshot};
use futures_util::future;
use wasm_bindgen::{JsCast, JsValue};

/// An interface represents a [`crate::port::Port`] that has been fully initialised and
/// has verified that the other end of the channel is ready to receive messages.
pub struct Interface {
    pub(crate) port: crate::port::Port,
    pub(crate) listener: gloo_events::EventListener,
    pub(crate) messages_rx: mpsc::UnboundedReceiver<js_sys::Array>,
}

impl Interface {
    /// Create a new interface from anything that implements `Into<Port>`, for example,
    /// a [`web_sys::MessagePort`], a [`web_sys::Worker`], or a [`web_sys::DedicatedWorkerGlobalScope`].
    /// This function is async and resolves to the new interface instance once the other side of
    /// the channel is ready.
    pub async fn new(port: impl Into<crate::port::Port>) -> Self {
        let port = port.into();
        let (dispatcher_tx, dispatcher_rx) = mpsc::unbounded();
        let (ready_tx, ready_rx) = oneshot::channel();
        let mut ready_tx = Option::from(ready_tx);
        let listener = gloo_events::EventListener::new(port.event_target(), "message", move |event| {
            let message = event.unchecked_ref::<web_sys::MessageEvent>().data();
            match message.dyn_into::<js_sys::Array>() {
                /* default path, enqueue the message for deserialization by the dispatcher */
                Ok(array) => {
                    let _ = dispatcher_tx.unbounded_send(array);
                },
                /* handshake path */
                Err(_) => if let Some(ready_tx) = ready_tx.take() {
                    let _ = ready_tx.send(());
                }
            }
        });
        /* needed for MessagePort */
        port.start();
        /* poll other end of the channel */
        let port_cloned = port.clone();
        let poll = async move {
            loop {
                port_cloned.post_message(&JsValue::NULL, &JsValue::UNDEFINED).unwrap();
                gloo_timers::future::TimeoutFuture::new(10).await;
            }
        };
        pin_utils::pin_mut!(poll);
        future::select(ready_rx, poll).await;
        /* at this point we know the other end's listener is available, but we may
           need to send one last message to indicate that we are available */
        port.post_message(&JsValue::NULL, &JsValue::UNDEFINED).unwrap();
        /* return the interface */
        Self {
            messages_rx: dispatcher_rx,
            listener,
            port,
        }
    }
}