1use std::{rc::Rc, task::Poll};
6
7use anyhow::Context;
8use futures::{
9 channel::mpsc,
10 io::{AsyncRead, AsyncWrite},
11 stream, StreamExt,
12};
13use js_sys::Reflect;
14use log::*;
15use send_wrapper::SendWrapper;
16use serde::{Deserialize, Serialize};
17use wasm_bindgen::{prelude::*, JsCast, JsValue};
18use wasm_bindgen_futures::JsFuture;
19use web_sys::{
20 RtcConfiguration, RtcDataChannel, RtcDataChannelEvent, RtcDataChannelType, RtcIceCandidateInit,
21 RtcIceServer, RtcPeerConnection, RtcPeerConnectionIceEvent, RtcSdpType,
22 RtcSessionDescriptionInit,
23};
24#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
25pub struct IceCandidate {
26 pub candidate: String,
27 #[serde(rename = "sdpMid")]
28 pub mid: String,
29}
30
31#[derive(Serialize, Deserialize, Debug)]
32pub struct SessionDescription {
34 pub sdp: String,
35 #[serde(rename = "type")]
36 pub sdp_type: String,
37}
38
39#[derive(Debug, Serialize, Deserialize)]
40#[serde(untagged)]
41pub enum Message {
43 RemoteDescription(SessionDescription),
44 RemoteCandidate(IceCandidate),
45}
46
47#[derive(Debug, Clone)]
48pub struct RtcConfig {
49 ice_servers: Vec<String>,
50}
51
52impl RtcConfig {
53 pub fn new<S: AsRef<str>>(ice_servers: &[S]) -> Self {
54 Self {
55 ice_servers: ice_servers.iter().map(|s| s.as_ref().to_string()).collect(),
56 }
57 }
58}
59
60pub struct DataStream {
62 rx_inbound: mpsc::Receiver<anyhow::Result<Vec<u8>>>,
66 buf_inbound: Vec<u8>,
68 _on_message: SendWrapper<Closure<dyn FnMut(web_sys::MessageEvent)>>,
72 inner: SendWrapper<Rc<RtcDataChannel>>,
73 }
76
77impl DataStream {
78 fn new(inner: RtcDataChannel) -> Self {
79 inner.set_binary_type(RtcDataChannelType::Arraybuffer);
80 let (mut tx, rx_inbound) = mpsc::channel(32);
81 let on_message = Closure::wrap(Box::new(move |ev: web_sys::MessageEvent| {
82 let res = match ev.data().dyn_into::<js_sys::ArrayBuffer>() {
83 Ok(data) => {
84 let byte_array: Vec<u8> = js_sys::Uint8Array::new(&data).to_vec();
85 Ok(byte_array)
86 }
87 Err(data) => Err(anyhow::anyhow!(
88 "Expected ArrayBuffer, received: \"{:?}\"",
89 data
90 )),
91 };
92 if let Err(e) = tx.try_send(res) {
93 error!("Error sending via channel: {:?}", e);
94 }
95 }) as Box<dyn FnMut(web_sys::MessageEvent)>);
96 inner.set_onmessage(Some(on_message.as_ref().unchecked_ref()));
97 Self {
98 _on_message: SendWrapper::new(on_message),
99 inner: SendWrapper::new(Rc::new(inner)),
100 buf_inbound: vec![],
101 rx_inbound,
102 }
103 }
104}
105
106impl AsyncRead for DataStream {
107 fn poll_read(
108 mut self: std::pin::Pin<&mut Self>,
109 cx: &mut std::task::Context<'_>,
110 buf: &mut [u8],
111 ) -> std::task::Poll<std::io::Result<usize>> {
112 if !self.buf_inbound.is_empty() {
113 let space = buf.len();
114 if self.buf_inbound.len() <= space {
115 let len = self.buf_inbound.len();
116 buf[..len].copy_from_slice(&self.buf_inbound[..]);
117 self.buf_inbound.drain(..);
118 Poll::Ready(Ok(len))
119 } else {
120 buf.copy_from_slice(&self.buf_inbound[..space]);
121 self.buf_inbound.drain(..space);
122 Poll::Ready(Ok(space))
123 }
124 } else {
125 match self.as_mut().rx_inbound.poll_next_unpin(cx) {
126 std::task::Poll::Ready(Some(Ok(x))) => {
127 let space = buf.len();
128 if x.len() <= space {
129 buf[..x.len()].copy_from_slice(&x[..]);
130 Poll::Ready(Ok(x.len()))
131 } else {
132 buf.copy_from_slice(&x[..space]);
133 self.buf_inbound.extend_from_slice(&x[space..]);
134 Poll::Ready(Ok(space))
135 }
136 }
137 std::task::Poll::Ready(Some(Err(e))) => Poll::Ready(Err(std::io::Error::new(
138 std::io::ErrorKind::Other,
139 e.to_string(),
140 ))),
141 std::task::Poll::Ready(None) => Poll::Ready(Ok(0)),
142 Poll::Pending => Poll::Pending,
143 }
144 }
145 }
146}
147
148impl AsyncWrite for DataStream {
149 fn poll_write(
150 mut self: std::pin::Pin<&mut Self>,
151 _cx: &mut std::task::Context<'_>,
152 buf: &[u8],
153 ) -> std::task::Poll<Result<usize, std::io::Error>> {
154 if let Err(e) = self.as_mut().inner.send_with_u8_array(buf) {
156 Poll::Ready(Err(std::io::Error::new(
157 std::io::ErrorKind::Other,
158 format!("{:?}", e),
159 )))
160 } else {
161 Poll::Ready(Ok(buf.len()))
162 }
163 }
164
165 fn poll_flush(
166 self: std::pin::Pin<&mut Self>,
167 _cx: &mut std::task::Context<'_>,
168 ) -> std::task::Poll<Result<(), std::io::Error>> {
169 Poll::Ready(Ok(()))
170 }
171
172 fn poll_close(
173 self: std::pin::Pin<&mut Self>,
174 _cx: &mut std::task::Context<'_>,
175 ) -> std::task::Poll<Result<(), std::io::Error>> {
176 Poll::Ready(Ok(()))
177 }
178}
179
180pub struct PeerConnection {
181 inner: SendWrapper<Rc<RtcPeerConnection>>,
184 sig_tx: mpsc::Sender<Message>,
185 sig_rx: mpsc::Receiver<Message>,
186 _on_ice_candidate: SendWrapper<Closure<dyn FnMut(RtcPeerConnectionIceEvent)>>,
187}
188
189impl PeerConnection {
190 pub fn new(
193 config: &RtcConfig,
194 (sig_tx, sig_rx): (mpsc::Sender<Message>, mpsc::Receiver<Message>),
195 ) -> anyhow::Result<Self> {
196 let mut rtc_config = RtcConfiguration::new();
197
198 let ice_servers = js_sys::Array::new();
199 for s in &config.ice_servers {
200 let mut stun_server = RtcIceServer::new();
202 let stun_servers = js_sys::Array::new();
203 stun_servers.push(&JsValue::from(s));
204 stun_server.urls(&stun_servers);
205 ice_servers.push(&JsValue::from(&stun_server));
206 }
207 rtc_config.ice_servers(&ice_servers);
208
209 let inner = RtcPeerConnection::new_with_configuration(&rtc_config)
210 .map_err(|e| anyhow::anyhow!("Error creating peer connection {:?}", e.as_string()))?;
211
212 let mut sig_tx_c = sig_tx.clone();
213 let on_ice_candidate = Closure::wrap(Box::new(move |ev: RtcPeerConnectionIceEvent| {
214 if let Some(candidate) = ev.candidate() {
215 if let Err(e) = sig_tx_c.try_send(Message::RemoteCandidate(IceCandidate {
216 candidate: candidate.candidate(),
217 mid: candidate.sdp_mid().unwrap_or_default(),
218 })) {
219 error!("Sending via sig_tx failed {:?}", e);
220 }
221 }
222 })
223 as Box<dyn FnMut(RtcPeerConnectionIceEvent)>);
224
225 inner.set_onicecandidate(Some(on_ice_candidate.as_ref().unchecked_ref()));
226 Ok(Self {
227 inner: SendWrapper::new(Rc::new(inner)),
228 sig_rx,
229 sig_tx,
230 _on_ice_candidate: SendWrapper::new(on_ice_candidate),
231 })
232 }
233
234 pub async fn accept(self) -> anyhow::Result<DataStream> {
241 let Self {
242 inner,
243 sig_rx,
244 mut sig_tx,
245 ..
246 } = self;
247 enum Either<A, B> {
248 Left(A),
249 Right(B),
250 }
251 let (mut tx_open, mut rx_open) = mpsc::channel(1);
252 let (mut tx_chan, rx_chan) = mpsc::channel(1);
253
254 let on_open = Closure::wrap(Box::new(move || {
255 trace!("Inbound data channel opened");
256 tx_open.try_send(()).expect("channel diend l226");
257 }) as Box<dyn FnMut()>);
258 let on_data_channel = Closure::wrap(Box::new(move |ev: RtcDataChannelEvent| {
259 trace!("Inbound connection attempt");
260 let channel = ev.channel();
261 channel.set_onopen(Some(on_open.as_ref().unchecked_ref()));
262 if let Err(e) = tx_chan.try_send(channel) {
263 error!("err sending via channel {:?}", e);
264 }
265 }) as Box<dyn FnMut(RtcDataChannelEvent)>);
266 inner.set_ondatachannel(Some(on_data_channel.as_ref().unchecked_ref()));
267 let mut s = stream::select(sig_rx.map(Either::Left), rx_chan.map(Either::Right));
268
269 while let Some(m) = s.next().await {
270 match m {
271 Either::Left(remote_msg) => match remote_msg {
272 Message::RemoteDescription(desc) => {
273 if desc.sdp_type == "offer" {
274 trace!("Received offer from remote");
275 let mut description = RtcSessionDescriptionInit::new(RtcSdpType::Offer);
276 description.sdp(&desc.sdp);
277 JsFuture::from(inner.set_remote_description(&description))
278 .await
279 .map_err(|e| {
280 anyhow::anyhow!("Error setting remote description: {:?}", e)
281 })?;
282
283 let answer = JsFuture::from(inner.create_answer())
284 .await
285 .map_err(|e| anyhow::anyhow!("Error creating answer: {:?}", e))?;
286 let answer_sdp = Reflect::get(&answer, &JsValue::from_str("sdp"))
287 .map_err(|e| {
288 anyhow::anyhow!("Error extracting sdp from answer: {:?}", e)
289 })?
290 .as_string()
291 .unwrap();
292 let mut answer_obj = RtcSessionDescriptionInit::new(RtcSdpType::Answer);
293 answer_obj.sdp(&answer_sdp);
294 JsFuture::from(inner.set_local_description(&answer_obj))
295 .await
296 .map_err(|e| {
297 anyhow::anyhow!("Error setting local description: {:?}", e)
298 })?;
299
300 if let Err(e) =
301 sig_tx.try_send(Message::RemoteDescription(SessionDescription {
302 sdp_type: "answer".into(),
303 sdp: answer_sdp,
304 }))
305 {
306 error!("Error sending answer via channel: {:?}", e);
307 } else {
308 trace!("Sent answer to remote");
309 }
310 }
311 }
312 Message::RemoteCandidate(c) => {
313 let mut cand = RtcIceCandidateInit::new(&c.candidate);
314 cand.sdp_mid(Some(&c.mid));
315 JsFuture::from(
316 inner.add_ice_candidate_with_opt_rtc_ice_candidate_init(Some(&cand)),
317 )
318 .await
319 .map_err(|e| anyhow::anyhow!("Error adding ice candidate: {:?}", e))?;
320 }
321 },
322 Either::Right(dc) => {
323 inner.set_onicecandidate(None);
325 inner.set_ondatachannel(None);
326
327 rx_open.next().await.context("Waiting for open")?;
328 dc.set_onopen(None);
329 return Ok(DataStream::new(dc));
330 }
331 }
332 }
333 anyhow::bail!("Channel didn't open");
334 }
335
336 pub async fn dial(self, label: &str) -> anyhow::Result<DataStream> {
344 let Self {
345 mut sig_tx,
346 inner,
347 sig_rx,
348 ..
349 } = self;
350 let dc = inner.create_data_channel(label);
351 enum Either<A, B> {
352 Left(A),
353 Right(B),
354 }
355 let (mut tx_open, rx_open) = mpsc::channel::<()>(1);
356
357 let on_open = Closure::wrap(Box::new(move || {
358 trace!("Outbound Datachannel opened");
359 if let Err(e) = tx_open.try_send(()) {
360 error!("Error sending opening event: {:?}", e);
361 }
362 }) as Box<dyn FnMut()>);
363 dc.set_onopen(Some(on_open.as_ref().unchecked_ref()));
364 let mut s = stream::select(sig_rx.map(Either::Left), rx_open.map(Either::Right));
365
366 let offer = JsFuture::from(inner.create_offer())
367 .await
368 .map_err(|e| anyhow::anyhow!("Error creating offer: {:?}", e))?;
369 let offer_sdp = Reflect::get(&offer, &JsValue::from_str("sdp"))
370 .map_err(|e| anyhow::anyhow!("Error extracting sdp from offer: {:?}", e))?
371 .as_string()
372 .unwrap();
373
374 let mut offer_obj = RtcSessionDescriptionInit::new(RtcSdpType::Offer);
375 offer_obj.sdp(&offer_sdp);
376 let sld_promise = inner.set_local_description(&offer_obj);
377 JsFuture::from(sld_promise)
378 .await
379 .map_err(|e| anyhow::anyhow!("Error setting local description: {:?}", e))?;
380 sig_tx
381 .try_send(Message::RemoteDescription(SessionDescription {
382 sdp_type: "offer".into(),
383 sdp: offer_sdp,
384 }))
385 .context("Signaling channel closed")?;
386
387 while let Some(m) = s.next().await {
388 match m {
389 Either::Left(remote_msg) => match remote_msg {
390 Message::RemoteDescription(desc) => {
391 if desc.sdp_type == "answer" {
392 let mut description = RtcSessionDescriptionInit::new(
393 RtcSdpType::from_js_value(&JsValue::from_str(&desc.sdp_type))
394 .context("Error creating rtc session description")?,
395 );
396 description.sdp(&desc.sdp);
397 JsFuture::from(inner.set_remote_description(&description))
398 .await
399 .map_err(|e| {
400 anyhow::anyhow!("Error setting remote description: {:?}", e)
401 })?;
402 }
403 }
404 Message::RemoteCandidate(c) => {
405 let mut cand = RtcIceCandidateInit::new(&c.candidate);
406 cand.sdp_mid(Some(&c.mid));
407 JsFuture::from(
408 inner.add_ice_candidate_with_opt_rtc_ice_candidate_init(Some(&cand)),
409 )
410 .await
411 .map_err(|e| anyhow::anyhow!("Error adding ice candidate: {:?}", e))?;
412 }
413 },
414 Either::Right(_) => {
415 inner.set_onicecandidate(None);
417 dc.set_onopen(None);
418
419 return Ok(DataStream::new(dc));
420 }
421 }
422 }
423
424 anyhow::bail!("Channel didn't open");
425 }
426}