1use wasm_bindgen::JsCast as _;
2
3pub struct PeerConnection {
4 pc: web_sys::RtcPeerConnection,
5}
6
7#[derive(thiserror::Error, Debug)]
8pub struct Error(js_sys::Error);
9
10unsafe impl Send for Error {}
11unsafe impl Sync for Error {}
12
13impl From<wasm_bindgen::JsValue> for Error {
14 fn from(value: wasm_bindgen::JsValue) -> Self {
15 Self(value.into())
16 }
17}
18
19impl std::fmt::Display for Error {
20 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
21 let s: String = self.0.to_string().into();
22 write!(f, "{s}")
23 }
24}
25
26pub type SdpType = web_sys::RtcSdpType;
27pub type IceGatheringState = web_sys::RtcIceGatheringState;
28pub type PeerConnectionState = web_sys::RtcPeerConnectionState;
29pub type IceTransportPolicy = web_sys::RtcIceTransportPolicy;
30
31#[derive(Debug, Clone)]
32pub struct Description {
33 pub type_: SdpType,
34 pub sdp: String,
35}
36
37#[derive(Debug, Clone, serde::Serialize)]
38pub struct IceServer {
39 pub urls: Vec<String>,
40 pub username: Option<String>,
41 pub credential: Option<String>,
42}
43
44#[derive(Debug, Clone)]
45pub struct Configuration {
46 pub ice_servers: Vec<IceServer>,
47 pub ice_transport_policy: IceTransportPolicy,
48}
49
50impl Default for Configuration {
51 fn default() -> Self {
52 Self {
53 ice_servers: Default::default(),
54 ice_transport_policy: IceTransportPolicy::All,
55 }
56 }
57}
58
59unsafe impl Send for PeerConnection {}
60unsafe impl Sync for PeerConnection {}
61
62impl PeerConnection {
63 pub fn new(configuration: Configuration) -> Result<Self, Error> {
64 let mut raw = web_sys::RtcConfiguration::new();
65 raw.ice_servers(&serde_wasm_bindgen::to_value(&configuration.ice_servers).unwrap());
66 raw.ice_transport_policy(configuration.ice_transport_policy);
67 Ok(Self {
68 pc: web_sys::RtcPeerConnection::new_with_configuration(&raw)?,
69 })
70 }
71
72 pub fn close(&self) {
73 self.pc.close()
74 }
75
76 pub async fn create_offer(&self) -> Result<Description, Error> {
77 let raw = web_sys::RtcSessionDescription::from(
78 wasm_bindgen_futures::JsFuture::from(self.pc.create_offer()).await?,
79 );
80
81 Ok(Description {
82 type_: raw.type_(),
83 sdp: raw.sdp(),
84 })
85 }
86
87 pub async fn create_answer(&self) -> Result<Description, Error> {
88 let raw = web_sys::RtcSessionDescription::from(
89 wasm_bindgen_futures::JsFuture::from(self.pc.create_answer()).await?,
90 );
91
92 Ok(Description {
93 type_: raw.type_(),
94 sdp: raw.sdp(),
95 })
96 }
97
98 pub async fn set_local_description(&self, description: &Description) -> Result<(), Error> {
99 let mut raw = web_sys::RtcSessionDescriptionInit::new(description.type_);
100 raw.sdp(&description.sdp);
101 wasm_bindgen_futures::JsFuture::from(self.pc.set_local_description(&raw)).await?;
102 Ok(())
103 }
104
105 pub async fn set_remote_description(&self, description: &Description) -> Result<(), Error> {
106 let mut raw = web_sys::RtcSessionDescriptionInit::new(description.type_);
107 raw.sdp(&description.sdp);
108 wasm_bindgen_futures::JsFuture::from(self.pc.set_remote_description(&raw)).await?;
109 Ok(())
110 }
111
112 pub fn create_data_channel(
113 &self,
114 label: &str,
115 options: DataChannelOptions,
116 ) -> Result<DataChannel, Error> {
117 let mut raw = web_sys::RtcDataChannelInit::new();
118 raw.ordered(options.ordered);
119 if let Some(v) = options.max_packet_life_time {
120 raw.max_packet_life_time(v);
121 }
122 if let Some(v) = options.max_retransmits {
123 raw.max_retransmits(v);
124 }
125 raw.protocol(&options.protocol);
126 raw.negotiated(options.negotiated);
127 if let Some(v) = options.id {
128 raw.id(v);
129 }
130 Ok(DataChannel {
131 dc: self
132 .pc
133 .create_data_channel_with_data_channel_dict(label, &raw),
134 })
135 }
136
137 pub fn set_on_ice_candidate(&self, cb: Option<impl Fn(Option<&str>) + Send + Sync + 'static>) {
138 let cb = cb.map(|cb| {
139 wasm_bindgen::closure::Closure::<dyn FnMut(_)>::new(
140 move |ev: web_sys::RtcPeerConnectionIceEvent| {
141 cb(ev
142 .candidate()
143 .map(|cand| cand.candidate())
144 .as_ref()
145 .map(|v| v.as_str()));
146 },
147 )
148 });
149 self.pc
150 .set_onicecandidate(cb.as_ref().map(|cb| cb.as_ref().unchecked_ref()));
151 if let Some(cb) = cb {
152 cb.forget();
153 }
154 }
155
156 pub fn set_on_ice_gathering_state_change(
157 &self,
158 cb: Option<impl Fn(IceGatheringState) + Send + Sync + 'static>,
159 ) {
160 let pc = self.pc.clone();
161 let cb = cb.map(|cb| {
162 wasm_bindgen::closure::Closure::<dyn FnMut(_)>::new(move |_ev: web_sys::Event| {
163 cb(pc.ice_gathering_state());
164 })
165 });
166 self.pc
167 .set_onicegatheringstatechange(cb.as_ref().map(|cb| cb.as_ref().unchecked_ref()));
168 if let Some(cb) = cb {
169 cb.forget();
170 }
171 }
172
173 pub fn set_on_connection_state_change(
174 &self,
175 cb: Option<impl Fn(PeerConnectionState) + Send + Sync + 'static>,
176 ) {
177 let pc = self.pc.clone();
178 let cb = cb.map(|cb| {
179 wasm_bindgen::closure::Closure::<dyn FnMut(_)>::new(move |_ev: web_sys::Event| {
180 cb(pc.connection_state());
181 })
182 });
183 self.pc
184 .set_onconnectionstatechange(cb.as_ref().map(|cb| cb.as_ref().unchecked_ref()));
185 if let Some(cb) = cb {
186 cb.forget();
187 }
188 }
189
190 pub fn set_on_data_channel(&self, cb: Option<impl Fn(DataChannel) + Send + Sync + 'static>) {
191 let cb = cb.map(|cb| {
192 wasm_bindgen::closure::Closure::<dyn FnMut(_)>::new(
193 move |ev: web_sys::RtcDataChannelEvent| {
194 cb(DataChannel { dc: ev.channel() });
195 },
196 )
197 });
198 self.pc
199 .set_ondatachannel(cb.as_ref().map(|cb| cb.as_ref().unchecked_ref()));
200 if let Some(cb) = cb {
201 cb.forget();
202 }
203 }
204
205 pub fn local_description(&self) -> Option<Description> {
206 self.pc.local_description().map(|v| Description {
207 type_: v.type_(),
208 sdp: v.sdp(),
209 })
210 }
211
212 pub fn remote_description(&self) -> Option<Description> {
213 self.pc.remote_description().map(|v| Description {
214 type_: v.type_(),
215 sdp: v.sdp(),
216 })
217 }
218
219 pub async fn add_ice_candidate(&self, cand: Option<&str>) -> Result<(), crate::Error> {
220 wasm_bindgen_futures::JsFuture::from(
221 self.pc.add_ice_candidate_with_opt_rtc_ice_candidate(
222 cand.map(|cand| {
223 let raw = web_sys::RtcIceCandidateInit::new(cand);
224 web_sys::RtcIceCandidate::new(&raw).unwrap()
225 })
226 .as_ref(),
227 ),
228 )
229 .await?;
230 Ok(())
231 }
232}
233
234impl Drop for PeerConnection {
235 fn drop(&mut self) {
236 self.pc.close();
237 }
238}
239
240pub struct DataChannel {
241 dc: web_sys::RtcDataChannel,
242}
243
244unsafe impl Send for DataChannel {}
245unsafe impl Sync for DataChannel {}
246
247impl DataChannel {
248 pub fn set_on_open(&self, cb: Option<impl Fn() + Send + Sync + 'static>) {
249 let cb = cb.map(|cb| {
250 wasm_bindgen::closure::Closure::<dyn FnMut(_)>::new(
251 move |_: web_sys::RtcDataChannelEvent| {
252 cb();
253 },
254 )
255 });
256 self.dc
257 .set_onopen(cb.as_ref().map(|cb| cb.as_ref().unchecked_ref()));
258 if let Some(cb) = cb {
259 cb.forget();
260 }
261 }
262
263 pub fn set_on_close(&self, cb: Option<impl Fn() + Send + Sync + 'static>) {
264 let cb = cb.map(|cb| {
265 wasm_bindgen::closure::Closure::<dyn FnMut(_)>::new(
266 move |_: web_sys::RtcDataChannelEvent| {
267 cb();
268 },
269 )
270 });
271 self.dc
272 .set_onclose(cb.as_ref().map(|cb| cb.as_ref().unchecked_ref()));
273 if let Some(cb) = cb {
274 cb.forget();
275 }
276 }
277
278 pub fn set_on_buffered_amount_low(&self, cb: Option<impl Fn() + Send + Sync + 'static>) {
279 let cb = cb.map(|cb| {
280 wasm_bindgen::closure::Closure::<dyn FnMut(_)>::new(move |_: web_sys::Event| {
281 cb();
282 })
283 });
284 self.dc
285 .set_onbufferedamountlow(cb.as_ref().map(|cb| cb.as_ref().unchecked_ref()));
286 if let Some(cb) = cb {
287 cb.forget();
288 }
289 }
290
291 pub fn set_on_error(&self, cb: Option<impl Fn(Error) + Send + Sync + 'static>) {
292 let cb = cb.map(|cb| {
293 wasm_bindgen::closure::Closure::<dyn FnMut(_)>::new(move |ev: web_sys::ErrorEvent| {
294 cb(ev.error().into());
295 })
296 });
297 self.dc
298 .set_onerror(cb.as_ref().map(|cb| cb.as_ref().unchecked_ref()));
299 if let Some(cb) = cb {
300 cb.forget();
301 }
302 }
303
304 pub fn set_on_message(&self, cb: Option<impl Fn(&[u8]) + Send + Sync + 'static>) {
305 let cb = cb.map(|cb| {
306 wasm_bindgen::closure::Closure::<dyn FnMut(_)>::new(move |ev: web_sys::MessageEvent| {
307 let arr = match ev.data().dyn_into::<js_sys::ArrayBuffer>() {
308 Ok(arr) => arr,
309 Err(e) => {
310 log::error!("unsupported message: {:?}", e);
311 return;
312 }
313 };
314 cb(js_sys::Uint8Array::new(&arr).to_vec().as_slice());
315 })
316 });
317 self.dc
318 .set_onmessage(cb.as_ref().map(|cb| cb.as_ref().unchecked_ref()));
319 if let Some(cb) = cb {
320 cb.forget();
321 }
322 }
323
324 pub fn set_buffered_amount_low_threshold(&self, value: u32) {
325 self.dc.set_buffered_amount_low_threshold(value);
326 }
327
328 pub fn buffered_amount(&self) -> u32 {
329 self.dc.buffered_amount()
330 }
331
332 pub fn close(&self) {
333 self.dc.close();
334 }
335
336 pub fn send(&self, buf: &[u8]) -> Result<(), Error> {
337 self.dc.send_with_u8_array(buf)?;
338 Ok(())
339 }
340}
341
342impl Drop for DataChannel {
343 fn drop(&mut self) {
344 self.dc.close();
345 }
346}
347
348pub struct DataChannelOptions {
349 pub ordered: bool,
350 pub max_packet_life_time: Option<u16>,
351 pub max_retransmits: Option<u16>,
352 pub protocol: String,
353 pub negotiated: bool,
354 pub id: Option<u16>,
355}
356
357impl Default for DataChannelOptions {
358 fn default() -> Self {
359 Self {
360 ordered: true,
361 max_packet_life_time: None,
362 max_retransmits: None,
363 protocol: "".to_string(),
364 negotiated: false,
365 id: None,
366 }
367 }
368}
369
370#[cfg(test)]
371mod test {
372 wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser);
373
374 use super::*;
375 use wasm_bindgen_test::*;
376
377 #[wasm_bindgen_test]
378 pub async fn test_peer_connection_new() {
379 let pc = PeerConnection::new(Default::default()).unwrap();
380 pc.create_data_channel("test", Default::default()).unwrap();
381 }
382
383 #[wasm_bindgen_test]
384 pub async fn test_peer_connection_communicate() {
385 std::panic::set_hook(Box::new(console_error_panic_hook::hook));
386
387 let pc1 = PeerConnection::new(Default::default()).unwrap();
388 let pc1_gathered = std::sync::Arc::new(async_notify::Notify::new());
389 pc1.set_on_ice_gathering_state_change(Some({
390 let pc1_gathered = std::sync::Arc::clone(&pc1_gathered);
391 move |ice_gathering_state| {
392 if ice_gathering_state == IceGatheringState::Complete {
393 pc1_gathered.notify();
394 }
395 }
396 }));
397
398 let dc1 = pc1
399 .create_data_channel(
400 "test",
401 DataChannelOptions {
402 negotiated: true,
403 id: Some(1),
404 ..Default::default()
405 },
406 )
407 .unwrap();
408 let dc1_open = std::sync::Arc::new(async_notify::Notify::new());
409 dc1.set_on_open(Some({
410 let dc1_open = std::sync::Arc::clone(&dc1_open);
411 move || {
412 dc1_open.notify();
413 }
414 }));
415 pc1.set_local_description(&pc1.create_offer().await.unwrap())
416 .await
417 .unwrap();
418 pc1_gathered.notified().await;
419
420 let pc2 = PeerConnection::new(Default::default()).unwrap();
421 let pc2_gathered = std::sync::Arc::new(async_notify::Notify::new());
422 pc2.set_on_ice_gathering_state_change(Some({
423 let pc2_gathered = std::sync::Arc::clone(&pc2_gathered);
424 move |ice_gathering_state| {
425 if ice_gathering_state == IceGatheringState::Complete {
426 pc2_gathered.notify();
427 }
428 }
429 }));
430
431 let dc2 = pc2
432 .create_data_channel(
433 "test",
434 DataChannelOptions {
435 negotiated: true,
436 id: Some(1),
437 ..Default::default()
438 },
439 )
440 .unwrap();
441 let dc2_open = std::sync::Arc::new(async_notify::Notify::new());
442 dc2.set_on_open(Some({
443 let dc2_open = std::sync::Arc::clone(&dc2_open);
444 move || {
445 dc2_open.notify();
446 }
447 }));
448
449 let (tx1, rx1) = async_channel::bounded(1);
450 dc1.set_on_message(Some(move |msg: &[u8]| {
451 tx1.try_send(msg.to_vec()).unwrap();
452 }));
453
454 let (tx2, rx2) = async_channel::bounded(1);
455 dc2.set_on_message(Some(move |msg: &[u8]| {
456 tx2.try_send(msg.to_vec()).unwrap();
457 }));
458
459 pc2.set_remote_description(&pc1.local_description().unwrap())
460 .await
461 .unwrap();
462
463 pc2.set_local_description(&pc2.create_answer().await.unwrap())
464 .await
465 .unwrap();
466 pc2_gathered.notified().await;
467
468 pc1.set_remote_description(&pc2.local_description().unwrap())
469 .await
470 .unwrap();
471
472 dc1_open.notified().await;
473 dc2_open.notified().await;
474
475 dc1.send(b"hello world!").unwrap();
476 assert_eq!(rx2.recv().await.unwrap(), b"hello world!");
477
478 dc2.send(b"goodbye world!").unwrap();
479 assert_eq!(rx1.recv().await.unwrap(), b"goodbye world!");
480 }
481}