use alloc::string::{String, ToString};
use alloc::sync::Arc;
use alloc::vec::Vec;
use bevy_ecs::prelude::*;
use core::future::Future;
use core::pin::Pin;
use core::task::{Context, Poll};
use futures_channel::mpsc;
use futures_core::Stream;
use crate::IceConfig;
pub(crate) enum WebRtcEvent {
Connected,
Disconnected(String),
Error(String),
}
#[derive(Component)]
pub struct WebRtcChannels {
pub data_rx: mpsc::UnboundedReceiver<Vec<u8>>,
pub data_tx: mpsc::UnboundedSender<Vec<u8>>,
pub event_rx: mpsc::UnboundedReceiver<WebRtcEvent>,
}
#[derive(Component)]
pub struct PendingWebRtcConnection {
pub(crate) result: Arc<std::sync::Mutex<Option<PendingResult>>>,
pub(crate) error: Arc<std::sync::Mutex<Option<String>>>,
}
pub(crate) struct PendingResult {
pub data_tx: mpsc::UnboundedSender<Vec<u8>>,
pub data_rx: mpsc::UnboundedReceiver<Vec<u8>>,
pub event_rx: mpsc::UnboundedReceiver<WebRtcEvent>,
}
pub(crate) fn build_rtc_config(ice_config: &IceConfig) -> web_sys::RtcConfiguration {
let ice_servers = js_sys::Array::new();
for server in &ice_config.ice_servers {
let rtc_ice_server = web_sys::RtcIceServer::new();
let urls = js_sys::Array::new();
for url in &server.urls {
urls.push(&wasm_bindgen::JsValue::from_str(url));
}
rtc_ice_server.set_urls(&urls);
if let Some(username) = &server.username {
rtc_ice_server.set_username(username);
}
if let Some(credential) = &server.credential {
rtc_ice_server.set_credential(credential);
}
ice_servers.push(&rtc_ice_server);
}
let config = web_sys::RtcConfiguration::new();
config.set_ice_servers(&ice_servers);
config
}
pub(crate) fn create_data_channel_init() -> web_sys::RtcDataChannelInit {
let init = web_sys::RtcDataChannelInit::new();
init.set_ordered(false);
init.set_max_retransmits(0);
init
}
struct Next<'a, T> {
receiver: &'a mut mpsc::UnboundedReceiver<T>,
}
impl<T> Future for Next<'_, T> {
type Output = Option<T>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match Pin::new(&mut self.receiver).poll_next(cx) {
Poll::Ready(item) => Poll::Ready(item),
Poll::Pending => Poll::Pending,
}
}
}
fn next<T>(receiver: &mut mpsc::UnboundedReceiver<T>) -> Next<'_, T> {
Next { receiver }
}
pub(crate) async fn wait_for_ice_gathering(pc: &web_sys::RtcPeerConnection) -> Result<String, crate::WebRtcError> {
use wasm_bindgen::prelude::*;
if pc.ice_gathering_state() == web_sys::RtcIceGatheringState::Complete {
return pc.local_description()
.ok_or_else(|| crate::WebRtcError::Connection("no local description".into()))
.map(|desc| desc.sdp());
}
let (tx, rx) = futures_channel::oneshot::channel::<()>();
let tx = std::sync::Mutex::new(Some(tx));
let cb = Closure::<dyn FnMut()>::new({
let pc = pc.clone();
move || {
if pc.ice_gathering_state() == web_sys::RtcIceGatheringState::Complete {
if let Some(tx) = tx.lock().unwrap().take() {
let _ = tx.send(());
}
}
}
});
pc.set_onicegatheringstatechange(Some(cb.as_ref().unchecked_ref()));
cb.forget();
if pc.ice_gathering_state() == web_sys::RtcIceGatheringState::Complete {
return pc.local_description()
.ok_or_else(|| crate::WebRtcError::Connection("no local description".into()))
.map(|desc| desc.sdp());
}
rx.await.map_err(|_| crate::WebRtcError::Connection("ICE gathering cancelled".into()))?;
pc.local_description()
.ok_or_else(|| crate::WebRtcError::Connection("no local description after ICE".into()))
.map(|desc| desc.sdp())
}
pub(crate) fn wire_data_channel(
dc: &web_sys::RtcDataChannel,
) -> (
mpsc::UnboundedSender<Vec<u8>>,
mpsc::UnboundedReceiver<Vec<u8>>,
mpsc::UnboundedReceiver<WebRtcEvent>,
) {
use wasm_bindgen::prelude::*;
dc.set_binary_type(web_sys::RtcDataChannelType::Arraybuffer);
let (incoming_tx, incoming_rx) = mpsc::unbounded::<Vec<u8>>();
let (outgoing_tx, outgoing_rx) = mpsc::unbounded::<Vec<u8>>();
let (event_tx, event_rx) = mpsc::unbounded::<WebRtcEvent>();
let tx = incoming_tx.clone();
let onmessage = Closure::<dyn FnMut(web_sys::MessageEvent)>::new(move |ev: web_sys::MessageEvent| {
if let Ok(buf) = ev.data().dyn_into::<js_sys::ArrayBuffer>() {
let array = js_sys::Uint8Array::new(&buf);
let data = array.to_vec();
let _ = tx.unbounded_send(data);
}
});
dc.set_onmessage(Some(onmessage.as_ref().unchecked_ref()));
onmessage.forget();
let tx = event_tx.clone();
let onopen = Closure::<dyn FnMut()>::new(move || {
let _ = tx.unbounded_send(WebRtcEvent::Connected);
});
dc.set_onopen(Some(onopen.as_ref().unchecked_ref()));
onopen.forget();
let tx = event_tx.clone();
let onclose = Closure::<dyn FnMut()>::new(move || {
let _ = tx.unbounded_send(WebRtcEvent::Disconnected("DataChannel closed".to_string()));
});
dc.set_onclose(Some(onclose.as_ref().unchecked_ref()));
onclose.forget();
let tx = event_tx.clone();
let onerror = Closure::<dyn FnMut()>::new(move || {
let _ = tx.unbounded_send(WebRtcEvent::Error("DataChannel error".to_string()));
});
dc.set_onerror(Some(onerror.as_ref().unchecked_ref()));
onerror.forget();
let dc_clone = dc.clone();
let mut outgoing_rx = outgoing_rx;
wasm_bindgen_futures::spawn_local(async move {
while let Some(data) = next(&mut outgoing_rx).await {
let array = js_sys::Uint8Array::from(data.as_slice());
let _ = dc_clone.send_with_array_buffer_view(&array);
}
});
(outgoing_tx, incoming_rx, event_rx)
}