Skip to main content

antenna_client_web/signaling/
mod.rs

1use crate::Peer;
2use antenna_client_shared::{ClientMsg, ServerMsg};
3use antenna_protocol::UserMsgPayload;
4use anyhow::{Result, anyhow};
5use futures::StreamExt;
6use futures::channel::{mpsc, oneshot};
7use serde::Serialize;
8use std::{cell::RefCell, rc::Rc};
9use wasm_bindgen::{JsCast, JsValue, closure::Closure};
10use wasm_bindgen_futures::spawn_local;
11use web_sys::{MessageEvent, WebSocket};
12
13/// Browser-side client for the bundled antenna signaling server.
14pub struct SignalingClient {
15    ws: WebSocket,
16    rx: mpsc::UnboundedReceiver<String>,
17    _onmessage_cb: Closure<dyn FnMut(MessageEvent)>,
18}
19
20impl SignalingClient {
21    pub async fn connect(url: &str) -> Result<Self> {
22        let ws = WebSocket::new(url).map_err(|e| anyhow!("WebSocket::new failed: {:?}", e))?;
23
24        let (msg_tx, msg_rx) = mpsc::unbounded::<String>();
25        let onmessage_cb = {
26            let msg_tx = msg_tx.clone();
27            let cb = Closure::<dyn FnMut(MessageEvent)>::new(move |e: MessageEvent| {
28                if let Some(text) = e.data().as_string() {
29                    let _ = msg_tx.unbounded_send(text);
30                }
31            });
32            ws.set_onmessage(Some(cb.as_ref().unchecked_ref()));
33            cb
34        };
35
36        let (open_tx, open_rx) = oneshot::channel::<Result<()>>();
37        let open_tx = Rc::new(RefCell::new(Some(open_tx)));
38
39        let _onopen_cb = {
40            let open_tx = open_tx.clone();
41            let cb = Closure::<dyn FnMut()>::new(move || {
42                if let Some(tx) = open_tx.borrow_mut().take() {
43                    let _ = tx.send(Ok(()));
44                }
45            });
46            ws.set_onopen(Some(cb.as_ref().unchecked_ref()));
47            cb
48        };
49
50        let _onerror_cb = {
51            let open_tx = open_tx.clone();
52            let cb = Closure::<dyn FnMut(JsValue)>::new(move |_| {
53                if let Some(tx) = open_tx.borrow_mut().take() {
54                    let _ = tx.send(Err(anyhow!("WebSocket connection failed")));
55                }
56            });
57            ws.set_onerror(Some(cb.as_ref().unchecked_ref()));
58            cb
59        };
60
61        open_rx
62            .await
63            .map_err(|_| anyhow!("Open signal dropped"))??;
64
65        ws.set_onopen(None);
66        ws.set_onerror(None);
67
68        Ok(Self {
69            ws,
70            rx: msg_rx,
71            _onmessage_cb: onmessage_cb,
72        })
73    }
74
75    #[allow(clippy::await_holding_refcell_ref)]
76    pub async fn join<Msg: UserMsgPayload + 'static>(
77        mut self,
78        room_id: String,
79        peer: Rc<RefCell<Peer<Msg>>>,
80    ) -> Result<()> {
81        self.send(&ClientMsg::Join { room_id: &room_id })?;
82
83        let text = self.recv_text().await?;
84        match parse(&text)? {
85            ServerMsg::RequestOffer => {
86                let offer = peer.borrow().start().await?;
87                self.send(&ClientMsg::Offer {
88                    room_id: &room_id,
89                    offer: &offer,
90                })?;
91            }
92            ServerMsg::OfferReceived { offer } => {
93                let answer = peer.borrow().receive_offer(offer).await?;
94                self.send(&ClientMsg::Answer {
95                    room_id: &room_id,
96                    answer: &answer,
97                })?;
98            }
99            ServerMsg::Error { message } => return Err(anyhow!("{message}")),
100            _ => return Err(anyhow!("Unexpected message after join")),
101        }
102
103        spawn_local(async move {
104            while let Some(text) = self.rx.next().await {
105                match parse(&text) {
106                    Ok(ServerMsg::RequestOffer) => {
107                        let offer = match peer.borrow().start().await {
108                            Ok(o) => o,
109                            Err(_) => break,
110                        };
111                        if self
112                            .send(&ClientMsg::Offer {
113                                room_id: &room_id,
114                                offer: &offer,
115                            })
116                            .is_err()
117                        {
118                            break;
119                        }
120                    }
121                    Ok(ServerMsg::AnswerReceived { answer }) => {
122                        let _ = peer.borrow().receive_answer(answer).await;
123                    }
124                    _ => {}
125                }
126            }
127        });
128
129        Ok(())
130    }
131
132    fn send(&self, msg: &impl Serialize) -> Result<()> {
133        let text = serde_json::to_string(msg)?;
134        self.ws
135            .send_with_str(&text)
136            .map_err(|e| anyhow!("WebSocket send failed: {:?}", e))
137    }
138
139    async fn recv_text(&mut self) -> Result<String> {
140        self.rx
141            .next()
142            .await
143            .ok_or_else(|| anyhow!("WebSocket closed"))
144    }
145}
146
147fn parse(text: &str) -> Result<ServerMsg<'_>> {
148    serde_json::from_str(text).map_err(|e| anyhow!("Failed to parse server message: {e}"))
149}
150
151impl Drop for SignalingClient {
152    fn drop(&mut self) {
153        self.ws.set_onmessage(None);
154        self.ws.set_onopen(None);
155        self.ws.set_onerror(None);
156        let _ = self.ws.close();
157    }
158}