Skip to main content

flnet_wasm/
web_rtc_setup.rs

1use flmodules::broker::Broker;
2use futures::lock::Mutex;
3use js_sys::Reflect;
4use log::{error, warn};
5use serde::{Deserialize, Serialize};
6use std::sync::Arc;
7use wasm_bindgen::{prelude::*, JsCast};
8use wasm_bindgen_futures::JsFuture;
9use web_sys::{
10    MessageEvent, RtcConfiguration, RtcDataChannelState, RtcIceConnectionState,
11    RtcIceGatheringState,
12};
13
14use flnet::web_rtc::{
15    messages::{
16        ConnType, ConnectionStateMap, DataChannelState, IceConnectionState, IceGatheringState,
17        PeerMessage, SetupError, SignalingState, WebRTCInput, WebRTCMessage, WebRTCOutput,
18    },
19    node_connection::Direction,
20};
21
22use web_sys::{
23    Event, RtcDataChannel, RtcDataChannelEvent, RtcIceCandidate, RtcIceCandidateInit,
24    RtcPeerConnection, RtcPeerConnectionIceEvent, RtcSdpType, RtcSessionDescriptionInit,
25    RtcSignalingState,
26};
27
28pub struct WebRTCConnectionSetup {
29    pub rp_conn: RtcPeerConnection,
30    rtc_data: Arc<Mutex<Option<RtcDataChannel>>>,
31    broker: Broker<WebRTCMessage>,
32    // While the connection is not up, queue up messages in here.
33    queue: Vec<String>,
34    direction: Option<Direction>,
35}
36
37#[derive(Serialize, Deserialize)]
38struct IceServer<'a> {
39    urls: &'a str,
40    username: Option<&'a str>,
41    credential: Option<&'a str>,
42}
43
44impl WebRTCConnectionSetup {
45    pub async fn new(broker: Broker<WebRTCMessage>) -> Result<WebRTCConnectionSetup, SetupError> {
46        Ok(WebRTCConnectionSetup {
47            rp_conn: Self::create_rp_conn()?,
48            rtc_data: Arc::new(Mutex::new(None)),
49            broker,
50            queue: vec![],
51            direction: None,
52        })
53    }
54
55    pub fn create_rp_conn() -> Result<RtcPeerConnection, SetupError> {
56        // If no stun server is configured, only local IPs will be sent in the browser.
57        // At least the node webrtc does the correct thing...
58        let mut config = RtcConfiguration::new();
59        let servers_obj = vec![
60            IceServer {
61                urls: "stun:stun.l.google.com:19302",
62                username: None,
63                credential: None,
64            },
65            IceServer {
66                urls: "turn:web.fledg.re:3478",
67                username: Some("something"),
68                credential: Some("something"),
69            },
70        ];
71        let servers =
72            JsValue::from_serde(&servers_obj).map_err(|e| SetupError::SetupFail(e.to_string()))?;
73        config.ice_servers(&servers);
74        RtcPeerConnection::new_with_configuration(&config)
75            .map_err(|e| SetupError::SetupFail(format!("PeerConnection error: {:?}", e)))
76    }
77
78    pub fn reset(&mut self) -> Result<(), SetupError> {
79        let empty_callback = Closure::wrap(Box::new(move |_: MessageEvent| {
80            log::warn!("Got callback after reset");
81        }) as Box<dyn FnMut(MessageEvent)>);
82
83        if let Some(rtc_data_opt) = self.rtc_data.try_lock() {
84            if let Some(rtc_data) = rtc_data_opt.as_ref() {
85                rtc_data.set_onmessage(Some(empty_callback.as_ref().unchecked_ref()));
86                rtc_data.set_onopen(Some(empty_callback.as_ref().unchecked_ref()));
87            }
88        }
89        self.rp_conn
90            .set_onicecandidate(Some(empty_callback.as_ref().unchecked_ref()));
91        self.rp_conn
92            .set_ondatachannel(Some(empty_callback.as_ref().unchecked_ref()));
93
94        empty_callback.forget();
95
96        self.rp_conn.close();
97        self.rp_conn = Self::create_rp_conn()?;
98        WebRTCConnectionSetup::ice_start(&self.rp_conn, self.broker.clone());
99        self.direction = None;
100        if let Some(mut rd) = self.rtc_data.try_lock() {
101            rd.as_ref().map(|r| r.close());
102            *rd = None;
103        }
104        Ok(())
105    }
106
107    pub fn ice_start(rp_conn: &RtcPeerConnection, broker: Broker<WebRTCMessage>) {
108        let broker_cl = broker.clone();
109        let onicecandidate_callback1 =
110            Closure::wrap(Box::new(move |ev: RtcPeerConnectionIceEvent| {
111                let mut broker = broker_cl.clone();
112                if let Some(candidate) = ev.candidate() {
113                    let cand = format!("{}", candidate.candidate());
114                    wasm_bindgen_futures::spawn_local(async move {
115                        broker
116                            .emit_msg(WebRTCMessage::Output(WebRTCOutput::Setup(
117                                PeerMessage::IceCandidate(cand),
118                            )))
119                            .await
120                            .err()
121                            .map(|e| log::error!("While sending ICE candidate: {:?}", e));
122                    });
123                }
124            }) as Box<dyn FnMut(RtcPeerConnectionIceEvent)>);
125        rp_conn.set_onicecandidate(Some(onicecandidate_callback1.as_ref().unchecked_ref()));
126        onicecandidate_callback1.forget();
127        let broker_cl = broker.clone();
128        let rp_conn_cl = rp_conn.clone();
129        let oniceconnectionstatechange =
130            Closure::wrap(Box::new(move |_: RtcPeerConnectionIceEvent| {
131                let msg = match rp_conn_cl.ice_connection_state() {
132                    RtcIceConnectionState::Failed | RtcIceConnectionState::Disconnected => {
133                        WebRTCMessage::Output(WebRTCOutput::Disconnected)
134                    }
135                    _ => WebRTCMessage::Input(WebRTCInput::UpdateState),
136                };
137                let mut broker = broker_cl.clone();
138                wasm_bindgen_futures::spawn_local(async move {
139                    broker
140                        .emit_msg(msg)
141                        .await
142                        .err()
143                        .map(|e| log::error!("While sending ICE candidate: {:?}", e));
144                });
145            }) as Box<dyn FnMut(RtcPeerConnectionIceEvent)>);
146        rp_conn.set_oniceconnectionstatechange(Some(
147            oniceconnectionstatechange.as_ref().unchecked_ref(),
148        ));
149        oniceconnectionstatechange.forget();
150    }
151
152    // Returns the offer string that needs to be sent to the `Follower` node.
153    pub async fn make_offer(&mut self) -> Result<String, SetupError> {
154        if self.direction.is_some() {
155            log::warn!("Resetting with offer in already opened connection");
156            self.reset()?;
157        };
158        self.direction = Some(Direction::Outgoing);
159
160        let dc = self.rp_conn.create_data_channel("data-channel");
161        Self::dc_set_onopen(self.broker.clone(), self.rtc_data.clone(), dc);
162
163        let co = self.rp_conn.create_offer();
164        let offer = JsFuture::from(co)
165            .await
166            .map_err(|e| SetupError::SetupFail(format!("{:?}", e)))?;
167        let offer_sdp = Reflect::get(&offer, &JsValue::from_str("sdp"))
168            .map_err(|e| SetupError::SetupFail(format!("{:?}", e)))?
169            .as_string()
170            .unwrap();
171
172        let mut offer_obj = RtcSessionDescriptionInit::new(RtcSdpType::Offer);
173        offer_obj.sdp(&offer_sdp);
174        let sld_promise = self.rp_conn.set_local_description(&offer_obj);
175        JsFuture::from(sld_promise)
176            .await
177            .map_err(|e| SetupError::SetupFail(format!("{:?}", e)))?;
178        Ok(offer_sdp)
179    }
180
181    // Takes the offer string
182    pub async fn make_answer(&mut self, offer: String) -> Result<String, SetupError> {
183        if self.direction.is_some() {
184            log::warn!("Resetting with offer in already opened connection");
185            self.reset()?;
186        };
187        self.direction = Some(Direction::Incoming);
188
189        self.dc_create_follow();
190
191        let mut offer_obj = RtcSessionDescriptionInit::new(RtcSdpType::Offer);
192        offer_obj.sdp(&offer);
193        let srd_promise = self.rp_conn.set_remote_description(&offer_obj);
194        JsFuture::from(srd_promise)
195            .await
196            .map_err(|e| SetupError::SetupFail(e.as_string().unwrap()))?;
197
198        let answer = match JsFuture::from(self.rp_conn.create_answer()).await {
199            Ok(f) => f,
200            Err(e) => {
201                error!("Error answer: {:?}", e);
202                return Err(SetupError::SetupFail(e.as_string().unwrap()));
203            }
204        };
205        let answer_sdp = Reflect::get(&answer, &JsValue::from_str("sdp"))
206            .map_err(|e| SetupError::SetupFail(e.as_string().unwrap()))?
207            .as_string()
208            .unwrap();
209
210        let mut answer_obj = RtcSessionDescriptionInit::new(RtcSdpType::Answer);
211        answer_obj.sdp(&answer_sdp);
212        let sld_promise = self.rp_conn.set_local_description(&answer_obj);
213        JsFuture::from(sld_promise)
214            .await
215            .map_err(|e| SetupError::SetupFail(e.as_string().unwrap()))?;
216        Ok(answer_sdp)
217    }
218
219    // Takes the answer string and finalizes the first part of the connection.
220    pub async fn use_answer(&mut self, answer: String) -> Result<(), SetupError> {
221        let dir = self
222            .direction
223            .clone()
224            .ok_or_else(|| SetupError::SetupFail("Direction not set".to_string()))?;
225        (dir == Direction::Outgoing)
226            .then(|| ())
227            .ok_or_else(|| SetupError::SetupFail("Should be outgoing direction".to_string()))?;
228
229        if self.rp_conn.signaling_state() == RtcSignalingState::Stable {
230            return Ok(());
231        }
232        let mut answer_obj = RtcSessionDescriptionInit::new(RtcSdpType::Answer);
233        answer_obj.sdp(&answer);
234        let srd_promise = self.rp_conn.set_remote_description(&answer_obj);
235        JsFuture::from(srd_promise)
236            .await
237            .map_err(|e| SetupError::SetupFail(format!("{:?}", e)))?;
238        Ok(())
239    }
240
241    // Sends the ICE string to the WebRTC.
242    pub async fn ice_put(&mut self, ice: String) -> Result<(), SetupError> {
243        let mut ric_init = RtcIceCandidateInit::new(&ice);
244        ric_init.sdp_mid(Some("0"));
245        ric_init.sdp_m_line_index(Some(0u16));
246        match RtcIceCandidate::new(&ric_init) {
247            Ok(e) => {
248                if let Err(err) = wasm_bindgen_futures::JsFuture::from(
249                    self.rp_conn
250                        .add_ice_candidate_with_opt_rtc_ice_candidate(Some(&e)),
251                )
252                .await
253                {
254                    warn!("Couldn't add ice candidate: {:?}", err);
255                }
256                Ok(())
257            }
258            Err(err) => Err(SetupError::SetupFail(format!(
259                "Couldn't consume ice: {:?}",
260                err
261            ))),
262        }
263        .map_err(|js| SetupError::SetupFail(js.to_string()))
264    }
265
266    pub async fn send(&mut self, msg: String) -> Result<(), SetupError> {
267        self.queue.push(msg);
268        self.send_queue().await
269    }
270
271    pub async fn send_queue(&mut self) -> Result<(), SetupError> {
272        let state = self.get_state().await?;
273        if let Some(state) = state.data_connection {
274            if state == DataChannelState::Open {
275                let rtc_data = self.rtc_data.try_lock().unwrap();
276                if let Some(ref mut data_channel) = rtc_data.as_ref() {
277                    for msg_queue in self.queue.drain(..) {
278                        data_channel
279                            .send_with_str(&msg_queue)
280                            .map_err(|e| SetupError::Send(format!("{e:?}")))?;
281                    }
282                    return Ok(());
283                }
284            }
285        }
286        Ok(())
287    }
288
289    fn dc_set_onopen(
290        broker: Broker<WebRTCMessage>,
291        rtc_data: Arc<Mutex<Option<RtcDataChannel>>>,
292        dc: RtcDataChannel,
293    ) {
294        let dc_clone = dc.clone();
295        let ondatachannel_open = Closure::wrap(Box::new(move |_ev: Event| {
296            let mut broker_clone = broker.clone();
297            let rtc_data = Arc::clone(&rtc_data);
298            let dc_clone2 = dc_clone.clone();
299            wasm_bindgen_futures::spawn_local(async move {
300                rtc_data.lock().await.replace(dc_clone2.clone());
301                broker_clone
302                    .emit_msg(WebRTCMessage::Output(WebRTCOutput::Connected))
303                    .await
304                    .err()
305                    .map(|e| log::error!("While sending connection: {:?}", e));
306            });
307
308            let broker_cl = broker.clone();
309            let onmessage_callback = Closure::wrap(Box::new(move |ev: MessageEvent| {
310                if let Some(message) = ev.data().as_string() {
311                    let mut broker = broker_cl.clone();
312                    wasm_bindgen_futures::spawn_local(async move {
313                        broker
314                            .emit_msg(WebRTCMessage::Output(WebRTCOutput::Text(message)))
315                            .await
316                            .err()
317                            .map(|e| log::error!("While sending message: {:?}", e));
318                    });
319                }
320            }) as Box<dyn FnMut(MessageEvent)>);
321            dc_clone.set_onmessage(Some(onmessage_callback.as_ref().unchecked_ref()));
322            onmessage_callback.forget();
323
324            let broker_cl = broker.clone();
325            let onerror_callback = Closure::wrap(Box::new(move |ev: MessageEvent| {
326                let mut broker = broker_cl.clone();
327                wasm_bindgen_futures::spawn_local(async move {
328                    broker
329                        .emit_msg(WebRTCMessage::Output(WebRTCOutput::Error(format!(
330                            "{:?}",
331                            ev
332                        ))))
333                        .await
334                        .err()
335                        .map(|e| log::error!("While sending message: {:?}", e));
336                });
337            }) as Box<dyn FnMut(MessageEvent)>);
338            dc_clone.set_onclose(Some(onerror_callback.as_ref().unchecked_ref()));
339            onerror_callback.forget();
340        }) as Box<dyn FnMut(Event)>);
341        dc.set_onopen(Some(ondatachannel_open.as_ref().unchecked_ref()));
342        ondatachannel_open.forget();
343    }
344
345    fn dc_create_follow(&self) {
346        let broker = self.broker.clone();
347        let rtc_data = self.rtc_data.clone();
348        let ondatachannel_callback = Closure::wrap(Box::new(move |ev: RtcDataChannelEvent| {
349            Self::dc_set_onopen(broker.clone(), rtc_data.clone(), ev.channel());
350        })
351            as Box<dyn FnMut(RtcDataChannelEvent)>);
352        self.rp_conn
353            .set_ondatachannel(Some(ondatachannel_callback.as_ref().unchecked_ref()));
354        ondatachannel_callback.forget();
355    }
356
357    pub async fn get_state(&self) -> Result<ConnectionStateMap, SetupError> {
358        let stats = self.rp_conn.get_stats();
359        let conn_stats: js_sys::Map = wasm_bindgen_futures::JsFuture::from(stats)
360            .await
361            .unwrap()
362            .into();
363
364        // conn_stats.for_each(&mut |v, k| log_1(&format!("- {:?}: {:?}", k, v).into()));
365        let mut type_remote = ConnType::Unknown;
366        conn_stats.for_each(&mut |k, _v| {
367            let s = format!("{:?}", k);
368            if s.contains("candidateType\":\"srflx") {
369                type_remote = ConnType::STUNServer;
370            } else if s.contains("candidateType\":\"prflx") {
371                type_remote = ConnType::STUNPeer;
372            } else if s.contains("candidateType\":\"relay") {
373                type_remote = ConnType::TURN;
374            } else if s.contains("candidateType\":\"host") {
375                type_remote = ConnType::Host;
376            }
377        });
378
379        let signaling = match self.rp_conn.signaling_state() {
380            RtcSignalingState::Stable => SignalingState::Stable,
381            RtcSignalingState::Closed => SignalingState::Closed,
382            _ => SignalingState::Setup,
383        };
384
385        let ice_gathering = match self.rp_conn.ice_gathering_state() {
386            RtcIceGatheringState::New => IceGatheringState::New,
387            RtcIceGatheringState::Gathering => IceGatheringState::Gathering,
388            RtcIceGatheringState::Complete => IceGatheringState::Complete,
389            RtcIceGatheringState::__Nonexhaustive => IceGatheringState::New,
390        };
391
392        let ice_connection = match self.rp_conn.ice_connection_state() {
393            RtcIceConnectionState::New => IceConnectionState::New,
394            RtcIceConnectionState::Checking => IceConnectionState::Checking,
395            RtcIceConnectionState::Connected => IceConnectionState::Connected,
396            RtcIceConnectionState::Completed => IceConnectionState::Completed,
397            RtcIceConnectionState::Failed => IceConnectionState::Failed,
398            RtcIceConnectionState::Disconnected => IceConnectionState::Disconnected,
399            RtcIceConnectionState::Closed => IceConnectionState::Closed,
400            RtcIceConnectionState::__Nonexhaustive => IceConnectionState::New,
401        };
402
403        let mut data_connection = None;
404        if let Some(rtc_data) = self.rtc_data.try_lock() {
405            if let Some(rtc_data_ref) = rtc_data.as_ref() {
406                data_connection = Some(match rtc_data_ref.ready_state() {
407                    RtcDataChannelState::Connecting => DataChannelState::Connecting,
408                    RtcDataChannelState::Open => DataChannelState::Open,
409                    RtcDataChannelState::Closing => DataChannelState::Closing,
410                    RtcDataChannelState::Closed => DataChannelState::Closed,
411                    RtcDataChannelState::__Nonexhaustive => DataChannelState::Closed,
412                });
413            }
414        }
415
416        Ok(ConnectionStateMap {
417            ice_gathering,
418            ice_connection,
419            data_connection,
420            signaling,
421            delay_ms: 0,
422            tx_bytes: 0,
423            rx_bytes: 0,
424            type_remote,
425            type_local: type_remote,
426        })
427    }
428}