1use async_trait::async_trait;
2use flmodules::broker::{Broker, Destination, Subsystem, SubsystemListener};
3use flnet::web_rtc::messages::{
4 PeerMessage, SetupError, WebRTCInput, WebRTCMessage, WebRTCOutput, WebRTCSpawner,
5};
6
7use crate::web_rtc_setup::WebRTCConnectionSetup;
8
9pub struct WebRTCConnection {
10 setup: WebRTCConnectionSetup,
11}
12
13impl WebRTCConnection {
14 pub async fn new_box() -> Result<Broker<WebRTCMessage>, SetupError> {
15 let broker = Broker::new();
16 let rn = WebRTCConnection {
17 setup: WebRTCConnectionSetup::new(broker.clone()).await?,
18 };
19 let rp_conn = rn.setup.rp_conn.clone();
20
21 broker
22 .clone()
23 .add_subsystem(Subsystem::Handler(Box::new(rn)))
24 .await?;
25 WebRTCConnectionSetup::ice_start(&rp_conn, broker.clone());
26 Ok(broker)
27 }
28
29 async fn setup(&mut self, pm: PeerMessage) -> Result<Option<PeerMessage>, SetupError> {
30 Ok(match pm {
31 PeerMessage::Init => Some(PeerMessage::Offer(self.setup.make_offer().await?)),
32 PeerMessage::Offer(o) => Some(PeerMessage::Answer(self.setup.make_answer(o).await?)),
33 PeerMessage::Answer(a) => {
34 self.setup.use_answer(a).await?;
35 None
36 }
37 PeerMessage::IceCandidate(ice) => {
38 self.setup.ice_put(ice).await?;
39 None
40 }
41 })
42 }
43
44 async fn msg_in(&mut self, msg: WebRTCInput) -> Result<Option<WebRTCMessage>, SetupError> {
45 match msg {
46 WebRTCInput::Text(s) => self.setup.send(s).await?,
47 WebRTCInput::Setup(s) => {
48 if let Some(msg) = self.setup(s).await? {
49 return Ok(Some(WebRTCMessage::Output(WebRTCOutput::Setup(msg))));
50 }
51 }
52 WebRTCInput::Flush => {
53 self.setup.send_queue().await?;
54 }
55 WebRTCInput::UpdateState => {
56 return Ok(Some(WebRTCMessage::Output(WebRTCOutput::State(
57 self.setup.get_state().await?,
58 ))));
59 }
60 WebRTCInput::Reset => self.setup.reset()?,
61 }
62 Ok(None)
63 }
64}
65
66#[async_trait(?Send)]
67impl SubsystemListener<WebRTCMessage> for WebRTCConnection {
68 async fn messages(&mut self, msgs: Vec<WebRTCMessage>) -> Vec<(Destination, WebRTCMessage)> {
69 let mut out = vec![];
70 for msg in msgs {
71 if let WebRTCMessage::Input(msg_in) = msg {
72 match self.msg_in(msg_in).await {
73 Ok(Some(msg)) => out.push((Destination::Others, msg)),
74 Ok(None) => {}
75 Err(e) => {
76 log::warn!("Error processing message: {:?}", e);
77 }
78 }
79 }
80 }
81 out
82 }
83}
84
85pub fn web_rtc_spawner() -> WebRTCSpawner {
86 Box::new(|| Box::new(Box::pin(WebRTCConnection::new_box())))
87}