use std::{cell::RefCell, rc::Rc};
use serde::{de::DeserializeOwned, Serialize};
use tokio::sync::mpsc;
use wasm_bindgen::prelude::*;
use web_sys::{MessageChannel, MessageEvent, MessagePort};
use crate::{
convert::{from_bytes, to_bytes},
error::InitError,
};
#[derive(Clone)]
pub struct Channel {
messages: Rc<RefCell<mpsc::UnboundedReceiver<JsValue>>>,
port: MessagePort,
}
impl Channel {
pub fn new() -> Result<(Self, MessagePort), InitError> {
let channel = MessageChannel::new().map_err(InitError::ChannelCreation)?;
Ok((Self::from(channel.port1()), channel.port2()))
}
fn on_message_callback(
sender: mpsc::UnboundedSender<JsValue>,
) -> Closure<dyn FnMut(MessageEvent)> {
Closure::new(move |event: MessageEvent| {
let _ = sender.send(event.data());
})
}
pub async fn recv<T: DeserializeOwned>(&self) -> Option<T> {
let bytes = self.recv_bytes().await?;
Some(from_bytes(&bytes))
}
#[allow(clippy::await_holding_refcell_ref)]
pub async fn recv_bytes(&self) -> Option<Box<[u8]>> {
let mut messages = self.messages.borrow_mut();
let value = messages.recv().await?;
drop(messages);
let array = js_sys::Uint8Array::new(&value);
Some(array.to_vec().into_boxed_slice())
}
pub fn send<T: Serialize>(&self, msg: &T) {
let bytes = to_bytes(msg);
self.send_bytes(&bytes);
}
pub fn send_bytes(&self, bytes: &[u8]) {
let array = js_sys::Uint8Array::new_with_length(bytes.len() as u32);
array.copy_from(bytes);
self.port
.post_message(&array)
.expect("Channel is already closed");
}
}
impl From<MessagePort> for Channel {
fn from(port: MessagePort) -> Self {
let (sender, receiver) = mpsc::unbounded_channel();
let callback_handle = Self::on_message_callback(sender);
port.set_onmessage(Some(callback_handle.as_ref().unchecked_ref()));
callback_handle.forget();
Self {
messages: Rc::new(RefCell::new(receiver)),
port,
}
}
}