antenna_client_web/signaling/
mod.rs1use crate::Peer;
2use antenna_client_shared::{ClientMsg, ServerMsg};
3use antenna_protocol::UserMsgPayload;
4use anyhow::{Result, anyhow};
5use futures::StreamExt;
6use futures::channel::{mpsc, oneshot};
7use serde::Serialize;
8use std::{cell::RefCell, rc::Rc};
9use wasm_bindgen::{JsCast, JsValue, closure::Closure};
10use wasm_bindgen_futures::spawn_local;
11use web_sys::{MessageEvent, WebSocket};
12
13pub struct SignalingClient {
15 ws: WebSocket,
16 rx: mpsc::UnboundedReceiver<String>,
17 _onmessage_cb: Closure<dyn FnMut(MessageEvent)>,
18}
19
20impl SignalingClient {
21 pub async fn connect(url: &str) -> Result<Self> {
22 let ws = WebSocket::new(url).map_err(|e| anyhow!("WebSocket::new failed: {:?}", e))?;
23
24 let (msg_tx, msg_rx) = mpsc::unbounded::<String>();
25 let onmessage_cb = {
26 let msg_tx = msg_tx.clone();
27 let cb = Closure::<dyn FnMut(MessageEvent)>::new(move |e: MessageEvent| {
28 if let Some(text) = e.data().as_string() {
29 let _ = msg_tx.unbounded_send(text);
30 }
31 });
32 ws.set_onmessage(Some(cb.as_ref().unchecked_ref()));
33 cb
34 };
35
36 let (open_tx, open_rx) = oneshot::channel::<Result<()>>();
37 let open_tx = Rc::new(RefCell::new(Some(open_tx)));
38
39 let _onopen_cb = {
40 let open_tx = open_tx.clone();
41 let cb = Closure::<dyn FnMut()>::new(move || {
42 if let Some(tx) = open_tx.borrow_mut().take() {
43 let _ = tx.send(Ok(()));
44 }
45 });
46 ws.set_onopen(Some(cb.as_ref().unchecked_ref()));
47 cb
48 };
49
50 let _onerror_cb = {
51 let open_tx = open_tx.clone();
52 let cb = Closure::<dyn FnMut(JsValue)>::new(move |_| {
53 if let Some(tx) = open_tx.borrow_mut().take() {
54 let _ = tx.send(Err(anyhow!("WebSocket connection failed")));
55 }
56 });
57 ws.set_onerror(Some(cb.as_ref().unchecked_ref()));
58 cb
59 };
60
61 open_rx
62 .await
63 .map_err(|_| anyhow!("Open signal dropped"))??;
64
65 ws.set_onopen(None);
66 ws.set_onerror(None);
67
68 Ok(Self {
69 ws,
70 rx: msg_rx,
71 _onmessage_cb: onmessage_cb,
72 })
73 }
74
75 #[allow(clippy::await_holding_refcell_ref)]
76 pub async fn join<Msg: UserMsgPayload + 'static>(
77 mut self,
78 room_id: String,
79 peer: Rc<RefCell<Peer<Msg>>>,
80 ) -> Result<()> {
81 self.send(&ClientMsg::Join { room_id: &room_id })?;
82
83 let text = self.recv_text().await?;
84 match parse(&text)? {
85 ServerMsg::RequestOffer => {
86 let offer = peer.borrow().start().await?;
87 self.send(&ClientMsg::Offer {
88 room_id: &room_id,
89 offer: &offer,
90 })?;
91 }
92 ServerMsg::OfferReceived { offer } => {
93 let answer = peer.borrow().receive_offer(offer).await?;
94 self.send(&ClientMsg::Answer {
95 room_id: &room_id,
96 answer: &answer,
97 })?;
98 }
99 ServerMsg::Error { message } => return Err(anyhow!("{message}")),
100 _ => return Err(anyhow!("Unexpected message after join")),
101 }
102
103 spawn_local(async move {
104 while let Some(text) = self.rx.next().await {
105 match parse(&text) {
106 Ok(ServerMsg::RequestOffer) => {
107 let offer = match peer.borrow().start().await {
108 Ok(o) => o,
109 Err(_) => break,
110 };
111 if self
112 .send(&ClientMsg::Offer {
113 room_id: &room_id,
114 offer: &offer,
115 })
116 .is_err()
117 {
118 break;
119 }
120 }
121 Ok(ServerMsg::AnswerReceived { answer }) => {
122 let _ = peer.borrow().receive_answer(answer).await;
123 }
124 _ => {}
125 }
126 }
127 });
128
129 Ok(())
130 }
131
132 fn send(&self, msg: &impl Serialize) -> Result<()> {
133 let text = serde_json::to_string(msg)?;
134 self.ws
135 .send_with_str(&text)
136 .map_err(|e| anyhow!("WebSocket send failed: {:?}", e))
137 }
138
139 async fn recv_text(&mut self) -> Result<String> {
140 self.rx
141 .next()
142 .await
143 .ok_or_else(|| anyhow!("WebSocket closed"))
144 }
145}
146
147fn parse(text: &str) -> Result<ServerMsg<'_>> {
148 serde_json::from_str(text).map_err(|e| anyhow!("Failed to parse server message: {e}"))
149}
150
151impl Drop for SignalingClient {
152 fn drop(&mut self) {
153 self.ws.set_onmessage(None);
154 self.ws.set_onopen(None);
155 self.ws.set_onerror(None);
156 let _ = self.ws.close();
157 }
158}