async_datachannel_wasm/
lib.rs

1///! Async wrapper for WebRTC data channels. This aims to be a drop-in replacemnt for the
2///[`async-datachannel`] crate.
3///!
4///! [`async-datachannel`]: https://crates.io/crates/async-datachannel
5use std::{rc::Rc, task::Poll};
6
7use anyhow::Context;
8use futures::{
9    channel::mpsc,
10    io::{AsyncRead, AsyncWrite},
11    stream, StreamExt,
12};
13use js_sys::Reflect;
14use log::*;
15use send_wrapper::SendWrapper;
16use serde::{Deserialize, Serialize};
17use wasm_bindgen::{prelude::*, JsCast, JsValue};
18use wasm_bindgen_futures::JsFuture;
19use web_sys::{
20    RtcConfiguration, RtcDataChannel, RtcDataChannelEvent, RtcDataChannelType, RtcIceCandidateInit,
21    RtcIceServer, RtcPeerConnection, RtcPeerConnectionIceEvent, RtcSdpType,
22    RtcSessionDescriptionInit,
23};
24#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
25pub struct IceCandidate {
26    pub candidate: String,
27    #[serde(rename = "sdpMid")]
28    pub mid: String,
29}
30
31#[derive(Serialize, Deserialize, Debug)]
32// considered opaque
33pub struct SessionDescription {
34    pub sdp: String,
35    #[serde(rename = "type")]
36    pub sdp_type: String,
37}
38
39#[derive(Debug, Serialize, Deserialize)]
40#[serde(untagged)]
41/// Messages to be used for external signalling.
42pub enum Message {
43    RemoteDescription(SessionDescription),
44    RemoteCandidate(IceCandidate),
45}
46
47#[derive(Debug, Clone)]
48pub struct RtcConfig {
49    ice_servers: Vec<String>,
50}
51
52impl RtcConfig {
53    pub fn new<S: AsRef<str>>(ice_servers: &[S]) -> Self {
54        Self {
55            ice_servers: ice_servers.iter().map(|s| s.as_ref().to_string()).collect(),
56        }
57    }
58}
59
60/// The opened data channel. This struct implements both [`AsyncRead`] and [`AsyncWrite`].
61pub struct DataStream {
62    /// The actual data channel
63    //    inner: Box<RtcDataChannel<DataChannel>>,
64    /// Receiver for inbound bytes from the data channel
65    rx_inbound: mpsc::Receiver<anyhow::Result<Vec<u8>>>,
66    /// Intermediate buffer of inbound bytes, to be polled by `poll_read`
67    buf_inbound: Vec<u8>,
68    // Reference to the PeerConnection to keep around
69    //   peer_con: Option<Arc<Mutex<Box<RtcPeerConnection<ConnInternal>>>>>,
70    //
71    _on_message: SendWrapper<Closure<dyn FnMut(web_sys::MessageEvent)>>,
72    inner: SendWrapper<Rc<RtcDataChannel>>,
73    // Do we need the peer_con?
74    //peer_con: RtcPeerConnection,
75}
76
77impl DataStream {
78    fn new(inner: RtcDataChannel) -> Self {
79        inner.set_binary_type(RtcDataChannelType::Arraybuffer);
80        let (mut tx, rx_inbound) = mpsc::channel(32);
81        let on_message = Closure::wrap(Box::new(move |ev: web_sys::MessageEvent| {
82            let res = match ev.data().dyn_into::<js_sys::ArrayBuffer>() {
83                Ok(data) => {
84                    let byte_array: Vec<u8> = js_sys::Uint8Array::new(&data).to_vec();
85                    Ok(byte_array)
86                }
87                Err(data) => Err(anyhow::anyhow!(
88                    "Expected ArrayBuffer, received: \"{:?}\"",
89                    data
90                )),
91            };
92            if let Err(e) = tx.try_send(res) {
93                error!("Error sending via channel: {:?}", e);
94            }
95        }) as Box<dyn FnMut(web_sys::MessageEvent)>);
96        inner.set_onmessage(Some(on_message.as_ref().unchecked_ref()));
97        Self {
98            _on_message: SendWrapper::new(on_message),
99            inner: SendWrapper::new(Rc::new(inner)),
100            buf_inbound: vec![],
101            rx_inbound,
102        }
103    }
104}
105
106impl AsyncRead for DataStream {
107    fn poll_read(
108        mut self: std::pin::Pin<&mut Self>,
109        cx: &mut std::task::Context<'_>,
110        buf: &mut [u8],
111    ) -> std::task::Poll<std::io::Result<usize>> {
112        if !self.buf_inbound.is_empty() {
113            let space = buf.len();
114            if self.buf_inbound.len() <= space {
115                let len = self.buf_inbound.len();
116                buf[..len].copy_from_slice(&self.buf_inbound[..]);
117                self.buf_inbound.drain(..);
118                Poll::Ready(Ok(len))
119            } else {
120                buf.copy_from_slice(&self.buf_inbound[..space]);
121                self.buf_inbound.drain(..space);
122                Poll::Ready(Ok(space))
123            }
124        } else {
125            match self.as_mut().rx_inbound.poll_next_unpin(cx) {
126                std::task::Poll::Ready(Some(Ok(x))) => {
127                    let space = buf.len();
128                    if x.len() <= space {
129                        buf[..x.len()].copy_from_slice(&x[..]);
130                        Poll::Ready(Ok(x.len()))
131                    } else {
132                        buf.copy_from_slice(&x[..space]);
133                        self.buf_inbound.extend_from_slice(&x[space..]);
134                        Poll::Ready(Ok(space))
135                    }
136                }
137                std::task::Poll::Ready(Some(Err(e))) => Poll::Ready(Err(std::io::Error::new(
138                    std::io::ErrorKind::Other,
139                    e.to_string(),
140                ))),
141                std::task::Poll::Ready(None) => Poll::Ready(Ok(0)),
142                Poll::Pending => Poll::Pending,
143            }
144        }
145    }
146}
147
148impl AsyncWrite for DataStream {
149    fn poll_write(
150        mut self: std::pin::Pin<&mut Self>,
151        _cx: &mut std::task::Context<'_>,
152        buf: &[u8],
153    ) -> std::task::Poll<Result<usize, std::io::Error>> {
154        // TODO: Maybe query the underlying buffer to signal backpressure
155        if let Err(e) = self.as_mut().inner.send_with_u8_array(buf) {
156            Poll::Ready(Err(std::io::Error::new(
157                std::io::ErrorKind::Other,
158                format!("{:?}", e),
159            )))
160        } else {
161            Poll::Ready(Ok(buf.len()))
162        }
163    }
164
165    fn poll_flush(
166        self: std::pin::Pin<&mut Self>,
167        _cx: &mut std::task::Context<'_>,
168    ) -> std::task::Poll<Result<(), std::io::Error>> {
169        Poll::Ready(Ok(()))
170    }
171
172    fn poll_close(
173        self: std::pin::Pin<&mut Self>,
174        _cx: &mut std::task::Context<'_>,
175    ) -> std::task::Poll<Result<(), std::io::Error>> {
176        Poll::Ready(Ok(()))
177    }
178}
179
180pub struct PeerConnection {
181    //    peer_con: Arc<Mutex<Box<RtcPeerConnection<ConnInternal>>>>,
182    //rx_incoming: mpsc::Receiver<DataStream>,
183    inner: SendWrapper<Rc<RtcPeerConnection>>,
184    sig_tx: mpsc::Sender<Message>,
185    sig_rx: mpsc::Receiver<Message>,
186    _on_ice_candidate: SendWrapper<Closure<dyn FnMut(RtcPeerConnectionIceEvent)>>,
187}
188
189impl PeerConnection {
190    /// Create a new [`PeerConnection`] to be used for either dialing or accepting an inbound
191    /// connection. The channel tuple is used to interface with an external signalling system.
192    pub fn new(
193        config: &RtcConfig,
194        (sig_tx, sig_rx): (mpsc::Sender<Message>, mpsc::Receiver<Message>),
195    ) -> anyhow::Result<Self> {
196        let mut rtc_config = RtcConfiguration::new();
197
198        let ice_servers = js_sys::Array::new();
199        for s in &config.ice_servers {
200            // TODO: handle stun?
201            let mut stun_server = RtcIceServer::new();
202            let stun_servers = js_sys::Array::new();
203            stun_servers.push(&JsValue::from(s));
204            stun_server.urls(&stun_servers);
205            ice_servers.push(&JsValue::from(&stun_server));
206        }
207        rtc_config.ice_servers(&ice_servers);
208
209        let inner = RtcPeerConnection::new_with_configuration(&rtc_config)
210            .map_err(|e| anyhow::anyhow!("Error creating peer connection {:?}", e.as_string()))?;
211
212        let mut sig_tx_c = sig_tx.clone();
213        let on_ice_candidate = Closure::wrap(Box::new(move |ev: RtcPeerConnectionIceEvent| {
214            if let Some(candidate) = ev.candidate() {
215                if let Err(e) = sig_tx_c.try_send(Message::RemoteCandidate(IceCandidate {
216                    candidate: candidate.candidate(),
217                    mid: candidate.sdp_mid().unwrap_or_default(),
218                })) {
219                    error!("Sending via sig_tx failed {:?}", e);
220                }
221            }
222        })
223            as Box<dyn FnMut(RtcPeerConnectionIceEvent)>);
224
225        inner.set_onicecandidate(Some(on_ice_candidate.as_ref().unchecked_ref()));
226        Ok(Self {
227            inner: SendWrapper::new(Rc::new(inner)),
228            sig_rx,
229            sig_tx,
230            _on_ice_candidate: SendWrapper::new(on_ice_candidate),
231        })
232    }
233
234    /// Wait for an inbound connection.
235    /// wait for remote offer
236    /// set_remote_desc(&offer)
237    /// create answer(&offer)
238    /// set_local_desc(&answer)
239    /// send(&answer)
240    pub async fn accept(self) -> anyhow::Result<DataStream> {
241        let Self {
242            inner,
243            sig_rx,
244            mut sig_tx,
245            ..
246        } = self;
247        enum Either<A, B> {
248            Left(A),
249            Right(B),
250        }
251        let (mut tx_open, mut rx_open) = mpsc::channel(1);
252        let (mut tx_chan, rx_chan) = mpsc::channel(1);
253
254        let on_open = Closure::wrap(Box::new(move || {
255            trace!("Inbound data channel opened");
256            tx_open.try_send(()).expect("channel diend l226");
257        }) as Box<dyn FnMut()>);
258        let on_data_channel = Closure::wrap(Box::new(move |ev: RtcDataChannelEvent| {
259            trace!("Inbound connection attempt");
260            let channel = ev.channel();
261            channel.set_onopen(Some(on_open.as_ref().unchecked_ref()));
262            if let Err(e) = tx_chan.try_send(channel) {
263                error!("err sending via channel {:?}", e);
264            }
265        }) as Box<dyn FnMut(RtcDataChannelEvent)>);
266        inner.set_ondatachannel(Some(on_data_channel.as_ref().unchecked_ref()));
267        let mut s = stream::select(sig_rx.map(Either::Left), rx_chan.map(Either::Right));
268
269        while let Some(m) = s.next().await {
270            match m {
271                Either::Left(remote_msg) => match remote_msg {
272                    Message::RemoteDescription(desc) => {
273                        if desc.sdp_type == "offer" {
274                            trace!("Received offer from remote");
275                            let mut description = RtcSessionDescriptionInit::new(RtcSdpType::Offer);
276                            description.sdp(&desc.sdp);
277                            JsFuture::from(inner.set_remote_description(&description))
278                                .await
279                                .map_err(|e| {
280                                    anyhow::anyhow!("Error setting remote description: {:?}", e)
281                                })?;
282
283                            let answer = JsFuture::from(inner.create_answer())
284                                .await
285                                .map_err(|e| anyhow::anyhow!("Error creating answer: {:?}", e))?;
286                            let answer_sdp = Reflect::get(&answer, &JsValue::from_str("sdp"))
287                                .map_err(|e| {
288                                    anyhow::anyhow!("Error extracting sdp from answer: {:?}", e)
289                                })?
290                                .as_string()
291                                .unwrap();
292                            let mut answer_obj = RtcSessionDescriptionInit::new(RtcSdpType::Answer);
293                            answer_obj.sdp(&answer_sdp);
294                            JsFuture::from(inner.set_local_description(&answer_obj))
295                                .await
296                                .map_err(|e| {
297                                    anyhow::anyhow!("Error setting local description: {:?}", e)
298                                })?;
299
300                            if let Err(e) =
301                                sig_tx.try_send(Message::RemoteDescription(SessionDescription {
302                                    sdp_type: "answer".into(),
303                                    sdp: answer_sdp,
304                                }))
305                            {
306                                error!("Error sending answer via channel: {:?}", e);
307                            } else {
308                                trace!("Sent answer to remote");
309                            }
310                        }
311                    }
312                    Message::RemoteCandidate(c) => {
313                        let mut cand = RtcIceCandidateInit::new(&c.candidate);
314                        cand.sdp_mid(Some(&c.mid));
315                        JsFuture::from(
316                            inner.add_ice_candidate_with_opt_rtc_ice_candidate_init(Some(&cand)),
317                        )
318                        .await
319                        .map_err(|e| anyhow::anyhow!("Error adding ice candidate: {:?}", e))?;
320                    }
321                },
322                Either::Right(dc) => {
323                    // Forget them closures
324                    inner.set_onicecandidate(None);
325                    inner.set_ondatachannel(None);
326
327                    rx_open.next().await.context("Waiting for open")?;
328                    dc.set_onopen(None);
329                    return Ok(DataStream::new(dc));
330                }
331            }
332        }
333        anyhow::bail!("Channel didn't open");
334    }
335
336    /// Initiate an outbound dialing.
337    /// dial
338    /// create offer
339    /// set local_description(&offer)
340    /// send(offer)
341    /// wait for remote answer
342    /// set_remote_description(&answer)
343    pub async fn dial(self, label: &str) -> anyhow::Result<DataStream> {
344        let Self {
345            mut sig_tx,
346            inner,
347            sig_rx,
348            ..
349        } = self;
350        let dc = inner.create_data_channel(label);
351        enum Either<A, B> {
352            Left(A),
353            Right(B),
354        }
355        let (mut tx_open, rx_open) = mpsc::channel::<()>(1);
356
357        let on_open = Closure::wrap(Box::new(move || {
358            trace!("Outbound Datachannel opened");
359            if let Err(e) = tx_open.try_send(()) {
360                error!("Error sending opening event: {:?}", e);
361            }
362        }) as Box<dyn FnMut()>);
363        dc.set_onopen(Some(on_open.as_ref().unchecked_ref()));
364        let mut s = stream::select(sig_rx.map(Either::Left), rx_open.map(Either::Right));
365
366        let offer = JsFuture::from(inner.create_offer())
367            .await
368            .map_err(|e| anyhow::anyhow!("Error creating offer: {:?}", e))?;
369        let offer_sdp = Reflect::get(&offer, &JsValue::from_str("sdp"))
370            .map_err(|e| anyhow::anyhow!("Error extracting sdp from offer: {:?}", e))?
371            .as_string()
372            .unwrap();
373
374        let mut offer_obj = RtcSessionDescriptionInit::new(RtcSdpType::Offer);
375        offer_obj.sdp(&offer_sdp);
376        let sld_promise = inner.set_local_description(&offer_obj);
377        JsFuture::from(sld_promise)
378            .await
379            .map_err(|e| anyhow::anyhow!("Error setting local description: {:?}", e))?;
380        sig_tx
381            .try_send(Message::RemoteDescription(SessionDescription {
382                sdp_type: "offer".into(),
383                sdp: offer_sdp,
384            }))
385            .context("Signaling channel closed")?;
386
387        while let Some(m) = s.next().await {
388            match m {
389                Either::Left(remote_msg) => match remote_msg {
390                    Message::RemoteDescription(desc) => {
391                        if desc.sdp_type == "answer" {
392                            let mut description = RtcSessionDescriptionInit::new(
393                                RtcSdpType::from_js_value(&JsValue::from_str(&desc.sdp_type))
394                                    .context("Error creating rtc session description")?,
395                            );
396                            description.sdp(&desc.sdp);
397                            JsFuture::from(inner.set_remote_description(&description))
398                                .await
399                                .map_err(|e| {
400                                    anyhow::anyhow!("Error setting remote description: {:?}", e)
401                                })?;
402                        }
403                    }
404                    Message::RemoteCandidate(c) => {
405                        let mut cand = RtcIceCandidateInit::new(&c.candidate);
406                        cand.sdp_mid(Some(&c.mid));
407                        JsFuture::from(
408                            inner.add_ice_candidate_with_opt_rtc_ice_candidate_init(Some(&cand)),
409                        )
410                        .await
411                        .map_err(|e| anyhow::anyhow!("Error adding ice candidate: {:?}", e))?;
412                    }
413                },
414                Either::Right(_) => {
415                    // Forget them closures
416                    inner.set_onicecandidate(None);
417                    dc.set_onopen(None);
418
419                    return Ok(DataStream::new(dc));
420                }
421            }
422        }
423
424        anyhow::bail!("Channel didn't open");
425    }
426}