1use flmodules::broker::Broker;
2use futures::lock::Mutex;
3use js_sys::Reflect;
4use log::{error, warn};
5use serde::{Deserialize, Serialize};
6use std::sync::Arc;
7use wasm_bindgen::{prelude::*, JsCast};
8use wasm_bindgen_futures::JsFuture;
9use web_sys::{
10 MessageEvent, RtcConfiguration, RtcDataChannelState, RtcIceConnectionState,
11 RtcIceGatheringState,
12};
13
14use flnet::web_rtc::{
15 messages::{
16 ConnType, ConnectionStateMap, DataChannelState, IceConnectionState, IceGatheringState,
17 PeerMessage, SetupError, SignalingState, WebRTCInput, WebRTCMessage, WebRTCOutput,
18 },
19 node_connection::Direction,
20};
21
22use web_sys::{
23 Event, RtcDataChannel, RtcDataChannelEvent, RtcIceCandidate, RtcIceCandidateInit,
24 RtcPeerConnection, RtcPeerConnectionIceEvent, RtcSdpType, RtcSessionDescriptionInit,
25 RtcSignalingState,
26};
27
28pub struct WebRTCConnectionSetup {
29 pub rp_conn: RtcPeerConnection,
30 rtc_data: Arc<Mutex<Option<RtcDataChannel>>>,
31 broker: Broker<WebRTCMessage>,
32 queue: Vec<String>,
34 direction: Option<Direction>,
35}
36
37#[derive(Serialize, Deserialize)]
38struct IceServer<'a> {
39 urls: &'a str,
40 username: Option<&'a str>,
41 credential: Option<&'a str>,
42}
43
44impl WebRTCConnectionSetup {
45 pub async fn new(broker: Broker<WebRTCMessage>) -> Result<WebRTCConnectionSetup, SetupError> {
46 Ok(WebRTCConnectionSetup {
47 rp_conn: Self::create_rp_conn()?,
48 rtc_data: Arc::new(Mutex::new(None)),
49 broker,
50 queue: vec![],
51 direction: None,
52 })
53 }
54
55 pub fn create_rp_conn() -> Result<RtcPeerConnection, SetupError> {
56 let mut config = RtcConfiguration::new();
59 let servers_obj = vec![
60 IceServer {
61 urls: "stun:stun.l.google.com:19302",
62 username: None,
63 credential: None,
64 },
65 IceServer {
66 urls: "turn:web.fledg.re:3478",
67 username: Some("something"),
68 credential: Some("something"),
69 },
70 ];
71 let servers =
72 JsValue::from_serde(&servers_obj).map_err(|e| SetupError::SetupFail(e.to_string()))?;
73 config.ice_servers(&servers);
74 RtcPeerConnection::new_with_configuration(&config)
75 .map_err(|e| SetupError::SetupFail(format!("PeerConnection error: {:?}", e)))
76 }
77
78 pub fn reset(&mut self) -> Result<(), SetupError> {
79 let empty_callback = Closure::wrap(Box::new(move |_: MessageEvent| {
80 log::warn!("Got callback after reset");
81 }) as Box<dyn FnMut(MessageEvent)>);
82
83 if let Some(rtc_data_opt) = self.rtc_data.try_lock() {
84 if let Some(rtc_data) = rtc_data_opt.as_ref() {
85 rtc_data.set_onmessage(Some(empty_callback.as_ref().unchecked_ref()));
86 rtc_data.set_onopen(Some(empty_callback.as_ref().unchecked_ref()));
87 }
88 }
89 self.rp_conn
90 .set_onicecandidate(Some(empty_callback.as_ref().unchecked_ref()));
91 self.rp_conn
92 .set_ondatachannel(Some(empty_callback.as_ref().unchecked_ref()));
93
94 empty_callback.forget();
95
96 self.rp_conn.close();
97 self.rp_conn = Self::create_rp_conn()?;
98 WebRTCConnectionSetup::ice_start(&self.rp_conn, self.broker.clone());
99 self.direction = None;
100 if let Some(mut rd) = self.rtc_data.try_lock() {
101 rd.as_ref().map(|r| r.close());
102 *rd = None;
103 }
104 Ok(())
105 }
106
107 pub fn ice_start(rp_conn: &RtcPeerConnection, broker: Broker<WebRTCMessage>) {
108 let broker_cl = broker.clone();
109 let onicecandidate_callback1 =
110 Closure::wrap(Box::new(move |ev: RtcPeerConnectionIceEvent| {
111 let mut broker = broker_cl.clone();
112 if let Some(candidate) = ev.candidate() {
113 let cand = format!("{}", candidate.candidate());
114 wasm_bindgen_futures::spawn_local(async move {
115 broker
116 .emit_msg(WebRTCMessage::Output(WebRTCOutput::Setup(
117 PeerMessage::IceCandidate(cand),
118 )))
119 .await
120 .err()
121 .map(|e| log::error!("While sending ICE candidate: {:?}", e));
122 });
123 }
124 }) as Box<dyn FnMut(RtcPeerConnectionIceEvent)>);
125 rp_conn.set_onicecandidate(Some(onicecandidate_callback1.as_ref().unchecked_ref()));
126 onicecandidate_callback1.forget();
127 let broker_cl = broker.clone();
128 let rp_conn_cl = rp_conn.clone();
129 let oniceconnectionstatechange =
130 Closure::wrap(Box::new(move |_: RtcPeerConnectionIceEvent| {
131 let msg = match rp_conn_cl.ice_connection_state() {
132 RtcIceConnectionState::Failed | RtcIceConnectionState::Disconnected => {
133 WebRTCMessage::Output(WebRTCOutput::Disconnected)
134 }
135 _ => WebRTCMessage::Input(WebRTCInput::UpdateState),
136 };
137 let mut broker = broker_cl.clone();
138 wasm_bindgen_futures::spawn_local(async move {
139 broker
140 .emit_msg(msg)
141 .await
142 .err()
143 .map(|e| log::error!("While sending ICE candidate: {:?}", e));
144 });
145 }) as Box<dyn FnMut(RtcPeerConnectionIceEvent)>);
146 rp_conn.set_oniceconnectionstatechange(Some(
147 oniceconnectionstatechange.as_ref().unchecked_ref(),
148 ));
149 oniceconnectionstatechange.forget();
150 }
151
152 pub async fn make_offer(&mut self) -> Result<String, SetupError> {
154 if self.direction.is_some() {
155 log::warn!("Resetting with offer in already opened connection");
156 self.reset()?;
157 };
158 self.direction = Some(Direction::Outgoing);
159
160 let dc = self.rp_conn.create_data_channel("data-channel");
161 Self::dc_set_onopen(self.broker.clone(), self.rtc_data.clone(), dc);
162
163 let co = self.rp_conn.create_offer();
164 let offer = JsFuture::from(co)
165 .await
166 .map_err(|e| SetupError::SetupFail(format!("{:?}", e)))?;
167 let offer_sdp = Reflect::get(&offer, &JsValue::from_str("sdp"))
168 .map_err(|e| SetupError::SetupFail(format!("{:?}", e)))?
169 .as_string()
170 .unwrap();
171
172 let mut offer_obj = RtcSessionDescriptionInit::new(RtcSdpType::Offer);
173 offer_obj.sdp(&offer_sdp);
174 let sld_promise = self.rp_conn.set_local_description(&offer_obj);
175 JsFuture::from(sld_promise)
176 .await
177 .map_err(|e| SetupError::SetupFail(format!("{:?}", e)))?;
178 Ok(offer_sdp)
179 }
180
181 pub async fn make_answer(&mut self, offer: String) -> Result<String, SetupError> {
183 if self.direction.is_some() {
184 log::warn!("Resetting with offer in already opened connection");
185 self.reset()?;
186 };
187 self.direction = Some(Direction::Incoming);
188
189 self.dc_create_follow();
190
191 let mut offer_obj = RtcSessionDescriptionInit::new(RtcSdpType::Offer);
192 offer_obj.sdp(&offer);
193 let srd_promise = self.rp_conn.set_remote_description(&offer_obj);
194 JsFuture::from(srd_promise)
195 .await
196 .map_err(|e| SetupError::SetupFail(e.as_string().unwrap()))?;
197
198 let answer = match JsFuture::from(self.rp_conn.create_answer()).await {
199 Ok(f) => f,
200 Err(e) => {
201 error!("Error answer: {:?}", e);
202 return Err(SetupError::SetupFail(e.as_string().unwrap()));
203 }
204 };
205 let answer_sdp = Reflect::get(&answer, &JsValue::from_str("sdp"))
206 .map_err(|e| SetupError::SetupFail(e.as_string().unwrap()))?
207 .as_string()
208 .unwrap();
209
210 let mut answer_obj = RtcSessionDescriptionInit::new(RtcSdpType::Answer);
211 answer_obj.sdp(&answer_sdp);
212 let sld_promise = self.rp_conn.set_local_description(&answer_obj);
213 JsFuture::from(sld_promise)
214 .await
215 .map_err(|e| SetupError::SetupFail(e.as_string().unwrap()))?;
216 Ok(answer_sdp)
217 }
218
219 pub async fn use_answer(&mut self, answer: String) -> Result<(), SetupError> {
221 let dir = self
222 .direction
223 .clone()
224 .ok_or_else(|| SetupError::SetupFail("Direction not set".to_string()))?;
225 (dir == Direction::Outgoing)
226 .then(|| ())
227 .ok_or_else(|| SetupError::SetupFail("Should be outgoing direction".to_string()))?;
228
229 if self.rp_conn.signaling_state() == RtcSignalingState::Stable {
230 return Ok(());
231 }
232 let mut answer_obj = RtcSessionDescriptionInit::new(RtcSdpType::Answer);
233 answer_obj.sdp(&answer);
234 let srd_promise = self.rp_conn.set_remote_description(&answer_obj);
235 JsFuture::from(srd_promise)
236 .await
237 .map_err(|e| SetupError::SetupFail(format!("{:?}", e)))?;
238 Ok(())
239 }
240
241 pub async fn ice_put(&mut self, ice: String) -> Result<(), SetupError> {
243 let mut ric_init = RtcIceCandidateInit::new(&ice);
244 ric_init.sdp_mid(Some("0"));
245 ric_init.sdp_m_line_index(Some(0u16));
246 match RtcIceCandidate::new(&ric_init) {
247 Ok(e) => {
248 if let Err(err) = wasm_bindgen_futures::JsFuture::from(
249 self.rp_conn
250 .add_ice_candidate_with_opt_rtc_ice_candidate(Some(&e)),
251 )
252 .await
253 {
254 warn!("Couldn't add ice candidate: {:?}", err);
255 }
256 Ok(())
257 }
258 Err(err) => Err(SetupError::SetupFail(format!(
259 "Couldn't consume ice: {:?}",
260 err
261 ))),
262 }
263 .map_err(|js| SetupError::SetupFail(js.to_string()))
264 }
265
266 pub async fn send(&mut self, msg: String) -> Result<(), SetupError> {
267 self.queue.push(msg);
268 self.send_queue().await
269 }
270
271 pub async fn send_queue(&mut self) -> Result<(), SetupError> {
272 let state = self.get_state().await?;
273 if let Some(state) = state.data_connection {
274 if state == DataChannelState::Open {
275 let rtc_data = self.rtc_data.try_lock().unwrap();
276 if let Some(ref mut data_channel) = rtc_data.as_ref() {
277 for msg_queue in self.queue.drain(..) {
278 data_channel
279 .send_with_str(&msg_queue)
280 .map_err(|e| SetupError::Send(format!("{e:?}")))?;
281 }
282 return Ok(());
283 }
284 }
285 }
286 Ok(())
287 }
288
289 fn dc_set_onopen(
290 broker: Broker<WebRTCMessage>,
291 rtc_data: Arc<Mutex<Option<RtcDataChannel>>>,
292 dc: RtcDataChannel,
293 ) {
294 let dc_clone = dc.clone();
295 let ondatachannel_open = Closure::wrap(Box::new(move |_ev: Event| {
296 let mut broker_clone = broker.clone();
297 let rtc_data = Arc::clone(&rtc_data);
298 let dc_clone2 = dc_clone.clone();
299 wasm_bindgen_futures::spawn_local(async move {
300 rtc_data.lock().await.replace(dc_clone2.clone());
301 broker_clone
302 .emit_msg(WebRTCMessage::Output(WebRTCOutput::Connected))
303 .await
304 .err()
305 .map(|e| log::error!("While sending connection: {:?}", e));
306 });
307
308 let broker_cl = broker.clone();
309 let onmessage_callback = Closure::wrap(Box::new(move |ev: MessageEvent| {
310 if let Some(message) = ev.data().as_string() {
311 let mut broker = broker_cl.clone();
312 wasm_bindgen_futures::spawn_local(async move {
313 broker
314 .emit_msg(WebRTCMessage::Output(WebRTCOutput::Text(message)))
315 .await
316 .err()
317 .map(|e| log::error!("While sending message: {:?}", e));
318 });
319 }
320 }) as Box<dyn FnMut(MessageEvent)>);
321 dc_clone.set_onmessage(Some(onmessage_callback.as_ref().unchecked_ref()));
322 onmessage_callback.forget();
323
324 let broker_cl = broker.clone();
325 let onerror_callback = Closure::wrap(Box::new(move |ev: MessageEvent| {
326 let mut broker = broker_cl.clone();
327 wasm_bindgen_futures::spawn_local(async move {
328 broker
329 .emit_msg(WebRTCMessage::Output(WebRTCOutput::Error(format!(
330 "{:?}",
331 ev
332 ))))
333 .await
334 .err()
335 .map(|e| log::error!("While sending message: {:?}", e));
336 });
337 }) as Box<dyn FnMut(MessageEvent)>);
338 dc_clone.set_onclose(Some(onerror_callback.as_ref().unchecked_ref()));
339 onerror_callback.forget();
340 }) as Box<dyn FnMut(Event)>);
341 dc.set_onopen(Some(ondatachannel_open.as_ref().unchecked_ref()));
342 ondatachannel_open.forget();
343 }
344
345 fn dc_create_follow(&self) {
346 let broker = self.broker.clone();
347 let rtc_data = self.rtc_data.clone();
348 let ondatachannel_callback = Closure::wrap(Box::new(move |ev: RtcDataChannelEvent| {
349 Self::dc_set_onopen(broker.clone(), rtc_data.clone(), ev.channel());
350 })
351 as Box<dyn FnMut(RtcDataChannelEvent)>);
352 self.rp_conn
353 .set_ondatachannel(Some(ondatachannel_callback.as_ref().unchecked_ref()));
354 ondatachannel_callback.forget();
355 }
356
357 pub async fn get_state(&self) -> Result<ConnectionStateMap, SetupError> {
358 let stats = self.rp_conn.get_stats();
359 let conn_stats: js_sys::Map = wasm_bindgen_futures::JsFuture::from(stats)
360 .await
361 .unwrap()
362 .into();
363
364 let mut type_remote = ConnType::Unknown;
366 conn_stats.for_each(&mut |k, _v| {
367 let s = format!("{:?}", k);
368 if s.contains("candidateType\":\"srflx") {
369 type_remote = ConnType::STUNServer;
370 } else if s.contains("candidateType\":\"prflx") {
371 type_remote = ConnType::STUNPeer;
372 } else if s.contains("candidateType\":\"relay") {
373 type_remote = ConnType::TURN;
374 } else if s.contains("candidateType\":\"host") {
375 type_remote = ConnType::Host;
376 }
377 });
378
379 let signaling = match self.rp_conn.signaling_state() {
380 RtcSignalingState::Stable => SignalingState::Stable,
381 RtcSignalingState::Closed => SignalingState::Closed,
382 _ => SignalingState::Setup,
383 };
384
385 let ice_gathering = match self.rp_conn.ice_gathering_state() {
386 RtcIceGatheringState::New => IceGatheringState::New,
387 RtcIceGatheringState::Gathering => IceGatheringState::Gathering,
388 RtcIceGatheringState::Complete => IceGatheringState::Complete,
389 RtcIceGatheringState::__Nonexhaustive => IceGatheringState::New,
390 };
391
392 let ice_connection = match self.rp_conn.ice_connection_state() {
393 RtcIceConnectionState::New => IceConnectionState::New,
394 RtcIceConnectionState::Checking => IceConnectionState::Checking,
395 RtcIceConnectionState::Connected => IceConnectionState::Connected,
396 RtcIceConnectionState::Completed => IceConnectionState::Completed,
397 RtcIceConnectionState::Failed => IceConnectionState::Failed,
398 RtcIceConnectionState::Disconnected => IceConnectionState::Disconnected,
399 RtcIceConnectionState::Closed => IceConnectionState::Closed,
400 RtcIceConnectionState::__Nonexhaustive => IceConnectionState::New,
401 };
402
403 let mut data_connection = None;
404 if let Some(rtc_data) = self.rtc_data.try_lock() {
405 if let Some(rtc_data_ref) = rtc_data.as_ref() {
406 data_connection = Some(match rtc_data_ref.ready_state() {
407 RtcDataChannelState::Connecting => DataChannelState::Connecting,
408 RtcDataChannelState::Open => DataChannelState::Open,
409 RtcDataChannelState::Closing => DataChannelState::Closing,
410 RtcDataChannelState::Closed => DataChannelState::Closed,
411 RtcDataChannelState::__Nonexhaustive => DataChannelState::Closed,
412 });
413 }
414 }
415
416 Ok(ConnectionStateMap {
417 ice_gathering,
418 ice_connection,
419 data_connection,
420 signaling,
421 delay_ms: 0,
422 tx_bytes: 0,
423 rx_bytes: 0,
424 type_remote,
425 type_local: type_remote,
426 })
427 }
428}