use wasm_bindgen::prelude::*;
use wasm_bindgen::JsCast;
use wasm_bindgen_futures::JsFuture;
use web_sys::{
MessageEvent, RtcConfiguration, RtcDataChannel, RtcDataChannelInit, RtcDataChannelType,
RtcIceGatheringState, RtcIceServer, RtcPeerConnection, RtcSdpType, RtcSessionDescriptionInit,
};
const STUN_URL: &str = "stun:stun.l.google.com:19302";
const CHANNEL_LABEL: &str = "lh-sharedfs";
const CHANNEL_ID: u16 = 0;
pub(crate) struct Peer {
pc: RtcPeerConnection,
channel: RtcDataChannel,
_on_message: Closure<dyn FnMut(MessageEvent)>,
}
fn new_pc() -> Result<RtcPeerConnection, JsValue> {
let ice = RtcIceServer::new();
ice.set_urls(&JsValue::from_str(STUN_URL));
let servers = js_sys::Array::new();
servers.push(&ice);
let cfg = RtcConfiguration::new();
cfg.set_ice_servers(&servers);
RtcPeerConnection::new_with_configuration(&cfg)
}
fn open_channel(
pc: &RtcPeerConnection,
mut on_msg: impl FnMut(Vec<u8>) + 'static,
) -> (RtcDataChannel, Closure<dyn FnMut(MessageEvent)>) {
let init = RtcDataChannelInit::new();
init.set_negotiated(true);
init.set_id(CHANNEL_ID);
let dc = pc.create_data_channel_with_data_channel_dict(CHANNEL_LABEL, &init);
dc.set_binary_type(RtcDataChannelType::Arraybuffer);
let cb = Closure::wrap(Box::new(move |ev: MessageEvent| {
let data = ev.data();
let bytes = if let Some(buf) = data.dyn_ref::<js_sys::ArrayBuffer>() {
js_sys::Uint8Array::new(buf).to_vec()
} else if let Some(s) = data.as_string() {
s.into_bytes()
} else {
return; };
on_msg(bytes);
}) as Box<dyn FnMut(MessageEvent)>);
dc.set_onmessage(Some(cb.as_ref().unchecked_ref()));
(dc, cb)
}
async fn wait_for_ice(pc: &RtcPeerConnection) {
for _ in 0..100 {
if pc.ice_gathering_state() == RtcIceGatheringState::Complete {
return;
}
crate::runtime::sleep_ms(50).await;
}
}
fn local_sdp(pc: &RtcPeerConnection) -> Result<String, JsValue> {
pc.local_description()
.map(|d| d.sdp())
.ok_or_else(|| JsValue::from_str("no local description"))
}
impl Peer {
pub(crate) async fn offer(
on_msg: impl FnMut(Vec<u8>) + 'static,
) -> Result<(Self, String), JsValue> {
let pc = new_pc()?;
let (channel, on_message) = open_channel(&pc, on_msg);
let offer = JsFuture::from(pc.create_offer()).await?;
let sdp = js_sys::Reflect::get(&offer, &JsValue::from_str("sdp"))?
.as_string()
.ok_or_else(|| JsValue::from_str("offer has no sdp"))?;
let desc = RtcSessionDescriptionInit::new(RtcSdpType::Offer);
desc.set_sdp(&sdp);
JsFuture::from(pc.set_local_description(&desc)).await?;
wait_for_ice(&pc).await;
let out = local_sdp(&pc)?;
Ok((
Self {
pc,
channel,
_on_message: on_message,
},
out,
))
}
pub(crate) async fn answer(
offer_sdp: &str,
on_msg: impl FnMut(Vec<u8>) + 'static,
) -> Result<(Self, String), JsValue> {
let pc = new_pc()?;
let (channel, on_message) = open_channel(&pc, on_msg);
let remote = RtcSessionDescriptionInit::new(RtcSdpType::Offer);
remote.set_sdp(offer_sdp);
JsFuture::from(pc.set_remote_description(&remote)).await?;
let answer = JsFuture::from(pc.create_answer()).await?;
let sdp = js_sys::Reflect::get(&answer, &JsValue::from_str("sdp"))?
.as_string()
.ok_or_else(|| JsValue::from_str("answer has no sdp"))?;
let desc = RtcSessionDescriptionInit::new(RtcSdpType::Answer);
desc.set_sdp(&sdp);
JsFuture::from(pc.set_local_description(&desc)).await?;
wait_for_ice(&pc).await;
let out = local_sdp(&pc)?;
Ok((
Self {
pc,
channel,
_on_message: on_message,
},
out,
))
}
pub(crate) async fn accept_answer(&self, answer_sdp: &str) -> Result<(), JsValue> {
let remote = RtcSessionDescriptionInit::new(RtcSdpType::Answer);
remote.set_sdp(answer_sdp);
JsFuture::from(self.pc.set_remote_description(&remote)).await?;
Ok(())
}
pub(crate) fn is_open(&self) -> bool {
self.channel.ready_state() == web_sys::RtcDataChannelState::Open
}
pub(crate) fn send(&self, bytes: &[u8]) -> Result<(), JsValue> {
self.channel.send_with_u8_array(bytes)
}
pub(crate) fn sender(&self) -> RtcDataChannel {
self.channel.clone()
}
}
impl Drop for Peer {
fn drop(&mut self) {
self.channel.set_onmessage(None);
self.channel.close();
self.pc.close();
}
}