use wasm_bindgen::prelude::*;
use wasm_bindgen::JsCast;
use wasm_bindgen_futures::JsFuture;
use web_sys::{
MessageEvent, RtcConfiguration, RtcDataChannel, RtcDataChannelInit, RtcDataChannelType,
RtcIceGatheringState, RtcIceServer, RtcPeerConnection, RtcSdpType, RtcSessionDescriptionInit,
};
const STUN_URL: &str = "stun:stun.l.google.com:19302";
const CHANNEL_LABEL: &str = "lh-sharedfs";
const CHANNEL_ID: u16 = 0;
const GAME_CHANNEL_LABEL: &str = "lh-game";
const GAME_CHANNEL_ID: u16 = 1;
pub(crate) struct Peer {
pc: RtcPeerConnection,
channel: RtcDataChannel,
game: RtcDataChannel,
_on_message: Closure<dyn FnMut(MessageEvent)>,
}
thread_local! {
static ICE_SERVERS: std::cell::RefCell<Option<js_sys::Array>> =
const { std::cell::RefCell::new(None) };
}
fn default_ice() -> js_sys::Array {
let ice = RtcIceServer::new();
ice.set_urls(&JsValue::from_str(STUN_URL));
let servers = js_sys::Array::new();
servers.push(&ice);
servers
}
async fn ice_servers() -> js_sys::Array {
if let Some(a) = ICE_SERVERS.with(|c| c.borrow().clone()) {
return a;
}
let arr = match crate::registry::fetch_ice_json().await {
Ok(text) => js_sys::JSON::parse(&text)
.ok()
.and_then(|j| js_sys::Reflect::get(&j, &JsValue::from_str("iceServers")).ok())
.and_then(|v| v.dyn_into::<js_sys::Array>().ok())
.filter(|a| a.length() > 0)
.unwrap_or_else(default_ice),
Err(_) => default_ice(),
};
ICE_SERVERS.with(|c| *c.borrow_mut() = Some(arr.clone()));
arr
}
async fn new_pc() -> Result<RtcPeerConnection, JsValue> {
let servers = ice_servers().await;
let cfg = RtcConfiguration::new();
cfg.set_ice_servers(&servers);
RtcPeerConnection::new_with_configuration(&cfg)
}
fn open_channels(
pc: &RtcPeerConnection,
mut on_msg: impl FnMut(Vec<u8>) + 'static,
) -> (RtcDataChannel, RtcDataChannel, Closure<dyn FnMut(MessageEvent)>) {
let sync_init = RtcDataChannelInit::new();
sync_init.set_negotiated(true);
sync_init.set_id(CHANNEL_ID);
let sync_dc = pc.create_data_channel_with_data_channel_dict(CHANNEL_LABEL, &sync_init);
sync_dc.set_binary_type(RtcDataChannelType::Arraybuffer);
let game_init = RtcDataChannelInit::new();
game_init.set_negotiated(true);
game_init.set_id(GAME_CHANNEL_ID);
game_init.set_ordered(false);
game_init.set_max_retransmits(0);
let game_dc = pc.create_data_channel_with_data_channel_dict(GAME_CHANNEL_LABEL, &game_init);
game_dc.set_binary_type(RtcDataChannelType::Arraybuffer);
let cb = Closure::wrap(Box::new(move |ev: MessageEvent| {
let data = ev.data();
let bytes = if let Some(buf) = data.dyn_ref::<js_sys::ArrayBuffer>() {
js_sys::Uint8Array::new(buf).to_vec()
} else if let Some(s) = data.as_string() {
s.into_bytes()
} else {
return; };
on_msg(bytes);
}) as Box<dyn FnMut(MessageEvent)>);
sync_dc.set_onmessage(Some(cb.as_ref().unchecked_ref()));
game_dc.set_onmessage(Some(cb.as_ref().unchecked_ref()));
(sync_dc, game_dc, cb)
}
async fn wait_for_ice(pc: &RtcPeerConnection) {
for _ in 0..100 {
if pc.ice_gathering_state() == RtcIceGatheringState::Complete {
return;
}
crate::runtime::sleep_ms(50).await;
}
}
fn local_sdp(pc: &RtcPeerConnection) -> Result<String, JsValue> {
pc.local_description()
.map(|d| d.sdp())
.ok_or_else(|| JsValue::from_str("no local description"))
}
impl Peer {
pub(crate) async fn offer(
on_msg: impl FnMut(Vec<u8>) + 'static,
) -> Result<(Self, String), JsValue> {
let pc = new_pc().await?;
let (channel, game, on_message) = open_channels(&pc, on_msg);
let offer = JsFuture::from(pc.create_offer()).await?;
let sdp = js_sys::Reflect::get(&offer, &JsValue::from_str("sdp"))?
.as_string()
.ok_or_else(|| JsValue::from_str("offer has no sdp"))?;
let desc = RtcSessionDescriptionInit::new(RtcSdpType::Offer);
desc.set_sdp(&sdp);
JsFuture::from(pc.set_local_description(&desc)).await?;
wait_for_ice(&pc).await;
let out = local_sdp(&pc)?;
Ok((
Self {
pc,
channel,
game,
_on_message: on_message,
},
out,
))
}
pub(crate) async fn answer(
offer_sdp: &str,
on_msg: impl FnMut(Vec<u8>) + 'static,
) -> Result<(Self, String), JsValue> {
let pc = new_pc().await?;
let (channel, game, on_message) = open_channels(&pc, on_msg);
let remote = RtcSessionDescriptionInit::new(RtcSdpType::Offer);
remote.set_sdp(offer_sdp);
JsFuture::from(pc.set_remote_description(&remote)).await?;
let answer = JsFuture::from(pc.create_answer()).await?;
let sdp = js_sys::Reflect::get(&answer, &JsValue::from_str("sdp"))?
.as_string()
.ok_or_else(|| JsValue::from_str("answer has no sdp"))?;
let desc = RtcSessionDescriptionInit::new(RtcSdpType::Answer);
desc.set_sdp(&sdp);
JsFuture::from(pc.set_local_description(&desc)).await?;
wait_for_ice(&pc).await;
let out = local_sdp(&pc)?;
Ok((
Self {
pc,
channel,
game,
_on_message: on_message,
},
out,
))
}
pub(crate) async fn accept_answer(&self, answer_sdp: &str) -> Result<(), JsValue> {
let remote = RtcSessionDescriptionInit::new(RtcSdpType::Answer);
remote.set_sdp(answer_sdp);
JsFuture::from(self.pc.set_remote_description(&remote)).await?;
Ok(())
}
pub(crate) fn is_open(&self) -> bool {
self.channel.ready_state() == web_sys::RtcDataChannelState::Open
}
pub(crate) fn send(&self, bytes: &[u8]) -> Result<(), JsValue> {
self.channel.send_with_u8_array(bytes)
}
pub(crate) fn send_game(&self, bytes: &[u8]) -> Result<(), JsValue> {
self.game.send_with_u8_array(bytes)
}
pub(crate) fn sender(&self) -> RtcDataChannel {
self.channel.clone()
}
pub(crate) async fn offer_to_host(
room: &str,
joiner_id: &str,
signer: &k256::ecdsa::SigningKey,
on_msg: impl FnMut(Vec<u8>) + 'static,
) -> Result<Self, JsValue> {
let (peer, offer) = Self::offer(on_msg).await?;
crate::registry::signal_post(signer, now_secs(), room, &format!("offer-{joiner_id}"), &offer)
.await
.map_err(|e| JsValue::from_str(&format!("signal_post offer: {e}")))?;
crate::registry::signal_join(signer, now_secs(), room, joiner_id)
.await
.map_err(|e| JsValue::from_str(&format!("signal_join: {e}")))?;
let answer = poll_signal(room, &format!("answer-{joiner_id}"), 60)
.await
.ok_or_else(|| JsValue::from_str("timed out waiting for the host's answer"))?;
peer.accept_answer(&answer).await?;
Ok(peer)
}
pub(crate) async fn answer_joiner(
room: &str,
joiner_id: &str,
signer: &k256::ecdsa::SigningKey,
on_msg: impl FnMut(Vec<u8>) + 'static,
) -> Result<Self, JsValue> {
let offer = poll_signal(room, &format!("offer-{joiner_id}"), 30)
.await
.ok_or_else(|| JsValue::from_str("joiner offer not found"))?;
let (peer, answer) = Self::answer(&offer, on_msg).await?;
crate::registry::signal_post(signer, now_secs(), room, &format!("answer-{joiner_id}"), &answer)
.await
.map_err(|e| JsValue::from_str(&format!("signal_post answer: {e}")))?;
Ok(peer)
}
pub(crate) async fn mesh_offer(
room: &str,
a: i32,
b: i32,
signer: &k256::ecdsa::SigningKey,
on_msg: impl FnMut(Vec<u8>) + 'static,
) -> Result<Self, JsValue> {
let (peer, offer) = Self::offer(on_msg).await?;
crate::registry::signal_post(signer, now_secs(), room, &format!("offer-{a}-{b}"), &offer)
.await
.map_err(|e| JsValue::from_str(&format!("mesh offer post: {e}")))?;
let answer = poll_signal(room, &format!("answer-{a}-{b}"), 40)
.await
.ok_or_else(|| JsValue::from_str("mesh: timed out waiting for the answer"))?;
peer.accept_answer(&answer).await?;
Ok(peer)
}
pub(crate) async fn mesh_answer(
room: &str,
a: i32,
b: i32,
signer: &k256::ecdsa::SigningKey,
on_msg: impl FnMut(Vec<u8>) + 'static,
) -> Result<Self, JsValue> {
let offer = poll_signal(room, &format!("offer-{a}-{b}"), 40)
.await
.ok_or_else(|| JsValue::from_str("mesh: offer not found"))?;
let (peer, answer) = Self::answer(&offer, on_msg).await?;
crate::registry::signal_post(signer, now_secs(), room, &format!("answer-{a}-{b}"), &answer)
.await
.map_err(|e| JsValue::from_str(&format!("mesh answer post: {e}")))?;
Ok(peer)
}
#[allow(dead_code)]
pub(crate) async fn connect_offerer(
room: &str,
signer: &k256::ecdsa::SigningKey,
on_msg: impl FnMut(Vec<u8>) + 'static,
) -> Result<Self, JsValue> {
let (peer, offer) = Self::offer(on_msg).await?;
crate::registry::signal_post(signer, now_secs(), room, "offer", &offer)
.await
.map_err(|e| JsValue::from_str(&format!("signal_post offer: {e}")))?;
let answer = poll_signal(room, "answer", 60)
.await
.ok_or_else(|| JsValue::from_str("timed out waiting for the peer's answer"))?;
peer.accept_answer(&answer).await?;
let _ = crate::registry::signal_clear(signer, now_secs(), room).await;
Ok(peer)
}
#[allow(dead_code)]
pub(crate) async fn connect_answerer(
room: &str,
signer: &k256::ecdsa::SigningKey,
on_msg: impl FnMut(Vec<u8>) + 'static,
) -> Result<Self, JsValue> {
let offer = poll_signal(room, "offer", 60)
.await
.ok_or_else(|| JsValue::from_str("timed out waiting for the peer's offer"))?;
let (peer, answer) = Self::answer(&offer, on_msg).await?;
crate::registry::signal_post(signer, now_secs(), room, "answer", &answer)
.await
.map_err(|e| JsValue::from_str(&format!("signal_post answer: {e}")))?;
Ok(peer)
}
}
fn now_secs() -> u64 {
(js_sys::Date::now() / 1000.0) as u64
}
async fn poll_signal(room: &str, slot: &str, secs: u32) -> Option<String> {
for _ in 0..secs {
if let Ok(Some(sdp)) = crate::registry::signal_get(room, slot).await {
return Some(sdp);
}
crate::runtime::sleep_ms(1000).await;
}
None
}
impl Drop for Peer {
fn drop(&mut self) {
self.channel.set_onmessage(None);
self.game.set_onmessage(None);
self.channel.close();
self.game.close();
self.pc.close();
}
}