web_datachannel/
lib.rs

1use wasm_bindgen::JsCast as _;
2
3pub struct PeerConnection {
4    pc: web_sys::RtcPeerConnection,
5}
6
7#[derive(thiserror::Error, Debug)]
8pub struct Error(js_sys::Error);
9
10unsafe impl Send for Error {}
11unsafe impl Sync for Error {}
12
13impl From<wasm_bindgen::JsValue> for Error {
14    fn from(value: wasm_bindgen::JsValue) -> Self {
15        Self(value.into())
16    }
17}
18
19impl std::fmt::Display for Error {
20    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
21        let s: String = self.0.to_string().into();
22        write!(f, "{s}")
23    }
24}
25
26pub type SdpType = web_sys::RtcSdpType;
27pub type IceGatheringState = web_sys::RtcIceGatheringState;
28pub type PeerConnectionState = web_sys::RtcPeerConnectionState;
29pub type IceTransportPolicy = web_sys::RtcIceTransportPolicy;
30
31#[derive(Debug, Clone)]
32pub struct Description {
33    pub type_: SdpType,
34    pub sdp: String,
35}
36
37#[derive(Debug, Clone, serde::Serialize)]
38pub struct IceServer {
39    pub urls: Vec<String>,
40    pub username: Option<String>,
41    pub credential: Option<String>,
42}
43
44#[derive(Debug, Clone)]
45pub struct Configuration {
46    pub ice_servers: Vec<IceServer>,
47    pub ice_transport_policy: IceTransportPolicy,
48}
49
50impl Default for Configuration {
51    fn default() -> Self {
52        Self {
53            ice_servers: Default::default(),
54            ice_transport_policy: IceTransportPolicy::All,
55        }
56    }
57}
58
59unsafe impl Send for PeerConnection {}
60unsafe impl Sync for PeerConnection {}
61
62impl PeerConnection {
63    pub fn new(configuration: Configuration) -> Result<Self, Error> {
64        let mut raw = web_sys::RtcConfiguration::new();
65        raw.ice_servers(&serde_wasm_bindgen::to_value(&configuration.ice_servers).unwrap());
66        raw.ice_transport_policy(configuration.ice_transport_policy);
67        Ok(Self {
68            pc: web_sys::RtcPeerConnection::new_with_configuration(&raw)?,
69        })
70    }
71
72    pub fn close(&self) {
73        self.pc.close()
74    }
75
76    pub async fn create_offer(&self) -> Result<Description, Error> {
77        let raw = web_sys::RtcSessionDescription::from(
78            wasm_bindgen_futures::JsFuture::from(self.pc.create_offer()).await?,
79        );
80
81        Ok(Description {
82            type_: raw.type_(),
83            sdp: raw.sdp(),
84        })
85    }
86
87    pub async fn create_answer(&self) -> Result<Description, Error> {
88        let raw = web_sys::RtcSessionDescription::from(
89            wasm_bindgen_futures::JsFuture::from(self.pc.create_answer()).await?,
90        );
91
92        Ok(Description {
93            type_: raw.type_(),
94            sdp: raw.sdp(),
95        })
96    }
97
98    pub async fn set_local_description(&self, description: &Description) -> Result<(), Error> {
99        let mut raw = web_sys::RtcSessionDescriptionInit::new(description.type_);
100        raw.sdp(&description.sdp);
101        wasm_bindgen_futures::JsFuture::from(self.pc.set_local_description(&raw)).await?;
102        Ok(())
103    }
104
105    pub async fn set_remote_description(&self, description: &Description) -> Result<(), Error> {
106        let mut raw = web_sys::RtcSessionDescriptionInit::new(description.type_);
107        raw.sdp(&description.sdp);
108        wasm_bindgen_futures::JsFuture::from(self.pc.set_remote_description(&raw)).await?;
109        Ok(())
110    }
111
112    pub fn create_data_channel(
113        &self,
114        label: &str,
115        options: DataChannelOptions,
116    ) -> Result<DataChannel, Error> {
117        let mut raw = web_sys::RtcDataChannelInit::new();
118        raw.ordered(options.ordered);
119        if let Some(v) = options.max_packet_life_time {
120            raw.max_packet_life_time(v);
121        }
122        if let Some(v) = options.max_retransmits {
123            raw.max_retransmits(v);
124        }
125        raw.protocol(&options.protocol);
126        raw.negotiated(options.negotiated);
127        if let Some(v) = options.id {
128            raw.id(v);
129        }
130        Ok(DataChannel {
131            dc: self
132                .pc
133                .create_data_channel_with_data_channel_dict(label, &raw),
134        })
135    }
136
137    pub fn set_on_ice_candidate(&self, cb: Option<impl Fn(Option<&str>) + Send + Sync + 'static>) {
138        let cb = cb.map(|cb| {
139            wasm_bindgen::closure::Closure::<dyn FnMut(_)>::new(
140                move |ev: web_sys::RtcPeerConnectionIceEvent| {
141                    cb(ev
142                        .candidate()
143                        .map(|cand| cand.candidate())
144                        .as_ref()
145                        .map(|v| v.as_str()));
146                },
147            )
148        });
149        self.pc
150            .set_onicecandidate(cb.as_ref().map(|cb| cb.as_ref().unchecked_ref()));
151        if let Some(cb) = cb {
152            cb.forget();
153        }
154    }
155
156    pub fn set_on_ice_gathering_state_change(
157        &self,
158        cb: Option<impl Fn(IceGatheringState) + Send + Sync + 'static>,
159    ) {
160        let pc = self.pc.clone();
161        let cb = cb.map(|cb| {
162            wasm_bindgen::closure::Closure::<dyn FnMut(_)>::new(move |_ev: web_sys::Event| {
163                cb(pc.ice_gathering_state());
164            })
165        });
166        self.pc
167            .set_onicegatheringstatechange(cb.as_ref().map(|cb| cb.as_ref().unchecked_ref()));
168        if let Some(cb) = cb {
169            cb.forget();
170        }
171    }
172
173    pub fn set_on_connection_state_change(
174        &self,
175        cb: Option<impl Fn(PeerConnectionState) + Send + Sync + 'static>,
176    ) {
177        let pc = self.pc.clone();
178        let cb = cb.map(|cb| {
179            wasm_bindgen::closure::Closure::<dyn FnMut(_)>::new(move |_ev: web_sys::Event| {
180                cb(pc.connection_state());
181            })
182        });
183        self.pc
184            .set_onconnectionstatechange(cb.as_ref().map(|cb| cb.as_ref().unchecked_ref()));
185        if let Some(cb) = cb {
186            cb.forget();
187        }
188    }
189
190    pub fn set_on_data_channel(&self, cb: Option<impl Fn(DataChannel) + Send + Sync + 'static>) {
191        let cb = cb.map(|cb| {
192            wasm_bindgen::closure::Closure::<dyn FnMut(_)>::new(
193                move |ev: web_sys::RtcDataChannelEvent| {
194                    cb(DataChannel { dc: ev.channel() });
195                },
196            )
197        });
198        self.pc
199            .set_ondatachannel(cb.as_ref().map(|cb| cb.as_ref().unchecked_ref()));
200        if let Some(cb) = cb {
201            cb.forget();
202        }
203    }
204
205    pub fn local_description(&self) -> Option<Description> {
206        self.pc.local_description().map(|v| Description {
207            type_: v.type_(),
208            sdp: v.sdp(),
209        })
210    }
211
212    pub fn remote_description(&self) -> Option<Description> {
213        self.pc.remote_description().map(|v| Description {
214            type_: v.type_(),
215            sdp: v.sdp(),
216        })
217    }
218
219    pub async fn add_ice_candidate(&self, cand: Option<&str>) -> Result<(), crate::Error> {
220        wasm_bindgen_futures::JsFuture::from(
221            self.pc.add_ice_candidate_with_opt_rtc_ice_candidate(
222                cand.map(|cand| {
223                    let raw = web_sys::RtcIceCandidateInit::new(cand);
224                    web_sys::RtcIceCandidate::new(&raw).unwrap()
225                })
226                .as_ref(),
227            ),
228        )
229        .await?;
230        Ok(())
231    }
232}
233
234impl Drop for PeerConnection {
235    fn drop(&mut self) {
236        self.pc.close();
237    }
238}
239
240pub struct DataChannel {
241    dc: web_sys::RtcDataChannel,
242}
243
244unsafe impl Send for DataChannel {}
245unsafe impl Sync for DataChannel {}
246
247impl DataChannel {
248    pub fn set_on_open(&self, cb: Option<impl Fn() + Send + Sync + 'static>) {
249        let cb = cb.map(|cb| {
250            wasm_bindgen::closure::Closure::<dyn FnMut(_)>::new(
251                move |_: web_sys::RtcDataChannelEvent| {
252                    cb();
253                },
254            )
255        });
256        self.dc
257            .set_onopen(cb.as_ref().map(|cb| cb.as_ref().unchecked_ref()));
258        if let Some(cb) = cb {
259            cb.forget();
260        }
261    }
262
263    pub fn set_on_close(&self, cb: Option<impl Fn() + Send + Sync + 'static>) {
264        let cb = cb.map(|cb| {
265            wasm_bindgen::closure::Closure::<dyn FnMut(_)>::new(
266                move |_: web_sys::RtcDataChannelEvent| {
267                    cb();
268                },
269            )
270        });
271        self.dc
272            .set_onclose(cb.as_ref().map(|cb| cb.as_ref().unchecked_ref()));
273        if let Some(cb) = cb {
274            cb.forget();
275        }
276    }
277
278    pub fn set_on_buffered_amount_low(&self, cb: Option<impl Fn() + Send + Sync + 'static>) {
279        let cb = cb.map(|cb| {
280            wasm_bindgen::closure::Closure::<dyn FnMut(_)>::new(move |_: web_sys::Event| {
281                cb();
282            })
283        });
284        self.dc
285            .set_onbufferedamountlow(cb.as_ref().map(|cb| cb.as_ref().unchecked_ref()));
286        if let Some(cb) = cb {
287            cb.forget();
288        }
289    }
290
291    pub fn set_on_error(&self, cb: Option<impl Fn(Error) + Send + Sync + 'static>) {
292        let cb = cb.map(|cb| {
293            wasm_bindgen::closure::Closure::<dyn FnMut(_)>::new(move |ev: web_sys::ErrorEvent| {
294                cb(ev.error().into());
295            })
296        });
297        self.dc
298            .set_onerror(cb.as_ref().map(|cb| cb.as_ref().unchecked_ref()));
299        if let Some(cb) = cb {
300            cb.forget();
301        }
302    }
303
304    pub fn set_on_message(&self, cb: Option<impl Fn(&[u8]) + Send + Sync + 'static>) {
305        let cb = cb.map(|cb| {
306            wasm_bindgen::closure::Closure::<dyn FnMut(_)>::new(move |ev: web_sys::MessageEvent| {
307                let arr = match ev.data().dyn_into::<js_sys::ArrayBuffer>() {
308                    Ok(arr) => arr,
309                    Err(e) => {
310                        log::error!("unsupported message: {:?}", e);
311                        return;
312                    }
313                };
314                cb(js_sys::Uint8Array::new(&arr).to_vec().as_slice());
315            })
316        });
317        self.dc
318            .set_onmessage(cb.as_ref().map(|cb| cb.as_ref().unchecked_ref()));
319        if let Some(cb) = cb {
320            cb.forget();
321        }
322    }
323
324    pub fn set_buffered_amount_low_threshold(&self, value: u32) {
325        self.dc.set_buffered_amount_low_threshold(value);
326    }
327
328    pub fn buffered_amount(&self) -> u32 {
329        self.dc.buffered_amount()
330    }
331
332    pub fn close(&self) {
333        self.dc.close();
334    }
335
336    pub fn send(&self, buf: &[u8]) -> Result<(), Error> {
337        self.dc.send_with_u8_array(buf)?;
338        Ok(())
339    }
340}
341
342impl Drop for DataChannel {
343    fn drop(&mut self) {
344        self.dc.close();
345    }
346}
347
348pub struct DataChannelOptions {
349    pub ordered: bool,
350    pub max_packet_life_time: Option<u16>,
351    pub max_retransmits: Option<u16>,
352    pub protocol: String,
353    pub negotiated: bool,
354    pub id: Option<u16>,
355}
356
357impl Default for DataChannelOptions {
358    fn default() -> Self {
359        Self {
360            ordered: true,
361            max_packet_life_time: None,
362            max_retransmits: None,
363            protocol: "".to_string(),
364            negotiated: false,
365            id: None,
366        }
367    }
368}
369
370#[cfg(test)]
371mod test {
372    wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser);
373
374    use super::*;
375    use wasm_bindgen_test::*;
376
377    #[wasm_bindgen_test]
378    pub async fn test_peer_connection_new() {
379        let pc = PeerConnection::new(Default::default()).unwrap();
380        pc.create_data_channel("test", Default::default()).unwrap();
381    }
382
383    #[wasm_bindgen_test]
384    pub async fn test_peer_connection_communicate() {
385        std::panic::set_hook(Box::new(console_error_panic_hook::hook));
386
387        let pc1 = PeerConnection::new(Default::default()).unwrap();
388        let pc1_gathered = std::sync::Arc::new(async_notify::Notify::new());
389        pc1.set_on_ice_gathering_state_change(Some({
390            let pc1_gathered = std::sync::Arc::clone(&pc1_gathered);
391            move |ice_gathering_state| {
392                if ice_gathering_state == IceGatheringState::Complete {
393                    pc1_gathered.notify();
394                }
395            }
396        }));
397
398        let dc1 = pc1
399            .create_data_channel(
400                "test",
401                DataChannelOptions {
402                    negotiated: true,
403                    id: Some(1),
404                    ..Default::default()
405                },
406            )
407            .unwrap();
408        let dc1_open = std::sync::Arc::new(async_notify::Notify::new());
409        dc1.set_on_open(Some({
410            let dc1_open = std::sync::Arc::clone(&dc1_open);
411            move || {
412                dc1_open.notify();
413            }
414        }));
415        pc1.set_local_description(&pc1.create_offer().await.unwrap())
416            .await
417            .unwrap();
418        pc1_gathered.notified().await;
419
420        let pc2 = PeerConnection::new(Default::default()).unwrap();
421        let pc2_gathered = std::sync::Arc::new(async_notify::Notify::new());
422        pc2.set_on_ice_gathering_state_change(Some({
423            let pc2_gathered = std::sync::Arc::clone(&pc2_gathered);
424            move |ice_gathering_state| {
425                if ice_gathering_state == IceGatheringState::Complete {
426                    pc2_gathered.notify();
427                }
428            }
429        }));
430
431        let dc2 = pc2
432            .create_data_channel(
433                "test",
434                DataChannelOptions {
435                    negotiated: true,
436                    id: Some(1),
437                    ..Default::default()
438                },
439            )
440            .unwrap();
441        let dc2_open = std::sync::Arc::new(async_notify::Notify::new());
442        dc2.set_on_open(Some({
443            let dc2_open = std::sync::Arc::clone(&dc2_open);
444            move || {
445                dc2_open.notify();
446            }
447        }));
448
449        let (tx1, rx1) = async_channel::bounded(1);
450        dc1.set_on_message(Some(move |msg: &[u8]| {
451            tx1.try_send(msg.to_vec()).unwrap();
452        }));
453
454        let (tx2, rx2) = async_channel::bounded(1);
455        dc2.set_on_message(Some(move |msg: &[u8]| {
456            tx2.try_send(msg.to_vec()).unwrap();
457        }));
458
459        pc2.set_remote_description(&pc1.local_description().unwrap())
460            .await
461            .unwrap();
462
463        pc2.set_local_description(&pc2.create_answer().await.unwrap())
464            .await
465            .unwrap();
466        pc2_gathered.notified().await;
467
468        pc1.set_remote_description(&pc2.local_description().unwrap())
469            .await
470            .unwrap();
471
472        dc1_open.notified().await;
473        dc2_open.notified().await;
474
475        dc1.send(b"hello world!").unwrap();
476        assert_eq!(rx2.recv().await.unwrap(), b"hello world!");
477
478        dc2.send(b"goodbye world!").unwrap();
479        assert_eq!(rx1.recv().await.unwrap(), b"goodbye world!");
480    }
481}