Skip to main content

flnet_wasm/
web_rtc.rs

1use async_trait::async_trait;
2use flmodules::broker::{Broker, Destination, Subsystem, SubsystemListener};
3use flnet::web_rtc::messages::{
4    PeerMessage, SetupError, WebRTCInput, WebRTCMessage, WebRTCOutput, WebRTCSpawner,
5};
6
7use crate::web_rtc_setup::WebRTCConnectionSetup;
8
9pub struct WebRTCConnection {
10    setup: WebRTCConnectionSetup,
11}
12
13impl WebRTCConnection {
14    pub async fn new_box() -> Result<Broker<WebRTCMessage>, SetupError> {
15        let broker = Broker::new();
16        let rn = WebRTCConnection {
17            setup: WebRTCConnectionSetup::new(broker.clone()).await?,
18        };
19        let rp_conn = rn.setup.rp_conn.clone();
20
21        broker
22            .clone()
23            .add_subsystem(Subsystem::Handler(Box::new(rn)))
24            .await?;
25        WebRTCConnectionSetup::ice_start(&rp_conn, broker.clone());
26        Ok(broker)
27    }
28
29    async fn setup(&mut self, pm: PeerMessage) -> Result<Option<PeerMessage>, SetupError> {
30        Ok(match pm {
31            PeerMessage::Init => Some(PeerMessage::Offer(self.setup.make_offer().await?)),
32            PeerMessage::Offer(o) => Some(PeerMessage::Answer(self.setup.make_answer(o).await?)),
33            PeerMessage::Answer(a) => {
34                self.setup.use_answer(a).await?;
35                None
36            }
37            PeerMessage::IceCandidate(ice) => {
38                self.setup.ice_put(ice).await?;
39                None
40            }
41        })
42    }
43
44    async fn msg_in(&mut self, msg: WebRTCInput) -> Result<Option<WebRTCMessage>, SetupError> {
45        match msg {
46            WebRTCInput::Text(s) => self.setup.send(s).await?,
47            WebRTCInput::Setup(s) => {
48                if let Some(msg) = self.setup(s).await? {
49                    return Ok(Some(WebRTCMessage::Output(WebRTCOutput::Setup(msg))));
50                }
51            }
52            WebRTCInput::Flush => {
53                self.setup.send_queue().await?;
54            }
55            WebRTCInput::UpdateState => {
56                return Ok(Some(WebRTCMessage::Output(WebRTCOutput::State(
57                    self.setup.get_state().await?,
58                ))));
59            }
60            WebRTCInput::Reset => self.setup.reset()?,
61        }
62        Ok(None)
63    }
64}
65
66#[async_trait(?Send)]
67impl SubsystemListener<WebRTCMessage> for WebRTCConnection {
68    async fn messages(&mut self, msgs: Vec<WebRTCMessage>) -> Vec<(Destination, WebRTCMessage)> {
69        let mut out = vec![];
70        for msg in msgs {
71            if let WebRTCMessage::Input(msg_in) = msg {
72                match self.msg_in(msg_in).await {
73                    Ok(Some(msg)) => out.push((Destination::Others, msg)),
74                    Ok(None) => {}
75                    Err(e) => {
76                        log::warn!("Error processing message: {:?}", e);
77                    }
78                }
79            }
80        }
81        out
82    }
83}
84
85pub fn web_rtc_spawner() -> WebRTCSpawner {
86    Box::new(|| Box::new(Box::pin(WebRTCConnection::new_box())))
87}