1use async_trait::async_trait;
7use futures::channel::{mpsc, oneshot};
8
9pub type ConnectReplySender =
11 std::sync::Arc<std::sync::Mutex<Option<oneshot::Sender<Result<(), crate::error::Error>>>>>;
12
13#[derive(Debug, Clone)]
15pub enum UnderlyingLayerEvent {
16 Connected,
17 Message(Vec<u8>),
18 Error(String),
19 Closed,
20 TimerExpired(String),
23}
24
25#[derive(Debug, Clone)]
27pub enum UnderlyingLayerCommand {
28 Connect(String, ConnectReplySender),
29 SendData(Vec<u8>),
30 Close,
31 TimerReset {
34 kind: String,
35 duration_ms: u64,
36 },
37 TimerCancel {
39 kind: String,
40 },
41}
42
43#[async_trait(?Send)]
45#[cfg(target_arch = "wasm32")]
46pub trait UnderlyingLayerInterface {
47 fn event_receiver(&mut self) -> mpsc::UnboundedReceiver<UnderlyingLayerEvent>;
49
50 fn command_sender(&self) -> mpsc::UnboundedSender<UnderlyingLayerCommand>;
52
53 async fn run(&mut self);
55}
56
57#[async_trait(?Send)]
59#[cfg(not(target_arch = "wasm32"))]
60pub trait UnderlyingLayerInterface: Send {
61 fn event_receiver(&mut self) -> mpsc::UnboundedReceiver<UnderlyingLayerEvent>;
63
64 fn command_sender(&self) -> mpsc::UnboundedSender<UnderlyingLayerCommand>;
66
67 async fn run(&mut self);
69}
70
71#[cfg(target_arch = "wasm32")]
73pub struct BrowserWebSocket {
74 event_sender: mpsc::UnboundedSender<UnderlyingLayerEvent>,
75 event_receiver: Option<mpsc::UnboundedReceiver<UnderlyingLayerEvent>>,
76 command_sender: mpsc::UnboundedSender<UnderlyingLayerCommand>,
77 command_receiver: mpsc::UnboundedReceiver<UnderlyingLayerCommand>,
78 active_timers: std::collections::HashMap<String, i32>,
80}
81
82#[cfg(target_arch = "wasm32")]
83impl BrowserWebSocket {
84 pub fn new() -> Self {
85 let (event_sender, event_receiver) = mpsc::unbounded();
86 let (command_sender, command_receiver) = mpsc::unbounded();
87
88 Self {
89 event_sender,
90 event_receiver: Some(event_receiver),
91 command_sender,
92 command_receiver,
93 active_timers: std::collections::HashMap::new(),
94 }
95 }
96}
97
98#[cfg(target_arch = "wasm32")]
99#[async_trait(?Send)]
100impl UnderlyingLayerInterface for BrowserWebSocket {
101 fn event_receiver(&mut self) -> mpsc::UnboundedReceiver<UnderlyingLayerEvent> {
102 self.event_receiver.take().unwrap_or_else(|| {
103 let (_, receiver) = mpsc::unbounded();
104 receiver
105 })
106 }
107
108 fn command_sender(&self) -> mpsc::UnboundedSender<UnderlyingLayerCommand> {
109 self.command_sender.clone()
110 }
111
112 async fn run(&mut self) {
113 use futures::stream::StreamExt;
114 use wasm_bindgen::prelude::*;
115 use wasm_bindgen::JsCast;
116 use web_sys::{BinaryType, ErrorEvent, MessageEvent, WebSocket};
117
118 web_sys::console::log_1(&"🚀 WEBSOCKET: NEW VERSION - CONNECT TIMING FIXED 🚀".into());
119
120 let mut websocket: Option<web_sys::WebSocket> = None;
121 let mut _closures: Vec<wasm_bindgen::closure::Closure<dyn FnMut(wasm_bindgen::JsValue)>> =
122 Vec::new();
123 let _is_connected = false;
124 let _pending_data: Vec<Vec<u8>> = Vec::new();
125
126 web_sys::console::log_1(&"WebSocket processor waiting for commands".into());
127
128 while let Some(command) = self.command_receiver.next().await {
129 web_sys::console::log_1(
130 &format!("WebSocket processor received command: {:?}", command).into(),
131 );
132 match command {
133 UnderlyingLayerCommand::Connect(url, reply_arc) => {
134 web_sys::console::log_1(&format!("WebSocket connecting to: {}", url).into());
135 web_sys::console::log_1(&"✅ Received Connect command with reply_arc".into());
136
137 let protocols = js_sys::Array::new();
139 protocols.push(&wasm_bindgen::JsValue::from_str("mqtt"));
140 let ws_result = WebSocket::new_with_str_sequence(&url, &protocols);
141
142 match ws_result {
143 Ok(ws) => {
144 web_sys::console::log_1(&"WebSocket created successfully".into());
145 ws.set_binary_type(BinaryType::Arraybuffer);
146 web_sys::console::log_1(&"Binary type set to ArrayBuffer".into());
147
148 let event_sender = self.event_sender.clone();
149 web_sys::console::log_1(&"Event sender cloned for closures".into());
150
151 let event_sender_clone = event_sender.clone();
153 let reply_arc_clone = reply_arc.clone();
154 web_sys::console::log_1(&"Creating onopen closure".into());
155 let onopen = Closure::wrap(Box::new(move |_: JsValue| {
156 web_sys::console::log_1(
157 &"🔥 NEW WEBSOCKET: WebSocket onopen fired 🔥".into(),
158 );
159
160 web_sys::console::log_1(
162 &"Attempting to lock reply_arc in onopen".into(),
163 );
164 match reply_arc_clone.lock() {
165 Ok(mut reply_opt) => {
166 web_sys::console::log_1(
167 &"Successfully locked reply_arc".into(),
168 );
169 if let Some(reply) = reply_opt.take() {
170 match reply.send(Ok(())) {
171 Ok(_) => web_sys::console::log_1(&"✅ Sent connect completion reply successfully".into()),
172 Err(_) => web_sys::console::log_1(&"❌ Failed to send connect completion reply - receiver dropped".into()),
173 }
174 } else {
175 web_sys::console::log_1(
176 &"❌ No reply sender in Option".into(),
177 );
178 }
179 }
180 Err(_) => {
181 web_sys::console::log_1(
182 &"❌ Failed to lock reply_arc".into(),
183 );
184 }
185 }
186
187 match event_sender_clone
188 .unbounded_send(UnderlyingLayerEvent::Connected)
189 {
190 Ok(_) => web_sys::console::log_1(
191 &"Sent Connected event successfully".into(),
192 ),
193 Err(e) => web_sys::console::log_1(
194 &format!("Failed to send Connected event: {:?}", e).into(),
195 ),
196 }
197 })
198 as Box<dyn FnMut(JsValue)>);
199 web_sys::console::log_1(
200 &"onopen closure created, setting on WebSocket".into(),
201 );
202 ws.set_onopen(Some(onopen.as_ref().unchecked_ref()));
203 web_sys::console::log_1(
204 &"onopen set on WebSocket, pushing to closures vec".into(),
205 );
206 _closures.push(onopen);
207 web_sys::console::log_1(&"onopen closure pushed to vec".into());
208
209 let event_sender_clone = event_sender.clone();
211 web_sys::console::log_1(&"Creating onmessage closure".into());
212 let onmessage = Closure::wrap(Box::new(move |e: JsValue| {
213 web_sys::console::log_1(&"WebSocket onmessage fired".into());
214 let event: MessageEvent = e.dyn_into().unwrap();
215 if let Ok(array_buffer) =
216 event.data().dyn_into::<js_sys::ArrayBuffer>()
217 {
218 let uint8_array = js_sys::Uint8Array::new(&array_buffer);
219 let mut data = vec![0; uint8_array.length() as usize];
220 uint8_array.copy_to(&mut data);
221 web_sys::console::log_1(
222 &format!("Received {} bytes", data.len()).into(),
223 );
224 match event_sender_clone
225 .unbounded_send(UnderlyingLayerEvent::Message(data))
226 {
227 Ok(_) => web_sys::console::log_1(
228 &"Sent Message event successfully".into(),
229 ),
230 Err(e) => web_sys::console::log_1(
231 &format!("Failed to send Message event: {:?}", e)
232 .into(),
233 ),
234 }
235 }
236 })
237 as Box<dyn FnMut(JsValue)>);
238 web_sys::console::log_1(
239 &"onmessage closure created, setting on WebSocket".into(),
240 );
241 ws.set_onmessage(Some(onmessage.as_ref().unchecked_ref()));
242 web_sys::console::log_1(
243 &"onmessage set on WebSocket, pushing to closures vec".into(),
244 );
245 _closures.push(onmessage);
246 web_sys::console::log_1(&"onmessage closure pushed to vec".into());
247
248 let event_sender_clone = event_sender.clone();
250 let onerror = Closure::wrap(Box::new(move |e: JsValue| {
251 web_sys::console::log_1(&"WebSocket onerror fired".into());
252 web_sys::console::log_1(&format!("Error event: {:?}", e).into());
253
254 let error_msg = if let Ok(error_event) = e.dyn_into::<ErrorEvent>()
255 {
256 let msg = error_event.message();
257 web_sys::console::log_1(
258 &format!("ErrorEvent message: {}", msg).into(),
259 );
260 msg
261 } else {
262 web_sys::console::log_1(
263 &"Not an ErrorEvent - unknown error".into(),
264 );
265 "Unknown WebSocket error".to_string()
266 };
267 let _ = event_sender_clone
268 .unbounded_send(UnderlyingLayerEvent::Error(error_msg));
269 })
270 as Box<dyn FnMut(JsValue)>);
271 ws.set_onerror(Some(onerror.as_ref().unchecked_ref()));
272 _closures.push(onerror);
273
274 let event_sender_clone = event_sender.clone();
276 let onclose = Closure::wrap(Box::new(move |e: JsValue| {
277 web_sys::console::log_1(&"WebSocket onclose fired".into());
278 web_sys::console::log_1(&format!("Close event: {:?}", e).into());
279
280 if let Ok(close_event) = e.dyn_into::<web_sys::CloseEvent>() {
282 let code = close_event.code();
283 let reason = close_event.reason();
284 let was_clean = close_event.was_clean();
285 web_sys::console::log_1(
286 &format!(
287 "Close code: {}, reason: '{}', clean: {}",
288 code, reason, was_clean
289 )
290 .into(),
291 );
292 }
293
294 let _ =
295 event_sender_clone.unbounded_send(UnderlyingLayerEvent::Closed);
296 })
297 as Box<dyn FnMut(JsValue)>);
298 ws.set_onclose(Some(onclose.as_ref().unchecked_ref()));
299 _closures.push(onclose);
300
301 websocket = Some(ws);
302 }
303 Err(e) => {
304 let _ = self
305 .event_sender
306 .unbounded_send(UnderlyingLayerEvent::Error(format!(
307 "Failed to create WebSocket: {:?}",
308 e
309 )));
310 }
311 }
312 }
313 UnderlyingLayerCommand::SendData(data) => {
314 web_sys::console::log_1(
315 &format!("WebSocket SendData command: {} bytes", data.len()).into(),
316 );
317 if let Some(ref ws) = websocket {
318 web_sys::console::log_1(
319 &"WebSocket is available, attempting to send".into(),
320 );
321 match ws.send_with_u8_array(&data) {
322 Ok(_) => {
323 web_sys::console::log_1(
324 &"WebSocket send_with_u8_array succeeded".into(),
325 );
326 }
327 Err(e) => {
328 web_sys::console::log_1(
329 &format!("WebSocket send_with_u8_array failed: {:?}", e).into(),
330 );
331 let _ =
332 self.event_sender
333 .unbounded_send(UnderlyingLayerEvent::Error(format!(
334 "Send failed: {:?}",
335 e
336 )));
337 }
338 }
339 } else {
340 web_sys::console::log_1(&"WebSocket not available for sending".into());
341 let _ = self
342 .event_sender
343 .unbounded_send(UnderlyingLayerEvent::Error(
344 "WebSocket not connected".to_string(),
345 ));
346 }
347 }
348 UnderlyingLayerCommand::Close => {
349 _closures.clear();
351
352 if let Some(ws) = websocket.take() {
353 ws.set_onopen(None);
355 ws.set_onmessage(None);
356 ws.set_onerror(None);
357 ws.set_onclose(None);
358 let _ = ws.close();
359 }
360
361 let _ = self
362 .event_sender
363 .unbounded_send(UnderlyingLayerEvent::Closed);
364 }
366 UnderlyingLayerCommand::TimerReset { kind, duration_ms } => {
367 if let Some(old_timer_id) = self.active_timers.remove(&kind) {
369 web_sys::console::log_1(
370 &format!("Cancelling existing timer {} (ID: {})", kind, old_timer_id)
371 .into(),
372 );
373 crate::platform::clear_timeout(old_timer_id);
374 }
375
376 let event_sender = self.event_sender.clone();
378 let timer_kind = kind.clone();
379 let callback = wasm_bindgen::closure::Closure::wrap(Box::new(move || {
380 web_sys::console::log_1(&format!("Timer expired: {}", timer_kind).into());
381 let _ = event_sender
382 .unbounded_send(UnderlyingLayerEvent::TimerExpired(timer_kind.clone()));
383 })
384 as Box<dyn Fn()>);
385
386 let timer_id = crate::platform::set_timeout(&callback, duration_ms as i32);
387 callback.forget();
388
389 self.active_timers.insert(kind.clone(), timer_id);
390 web_sys::console::log_1(
391 &format!(
392 "Timer set: {} (ID: {}) for {}ms",
393 kind, timer_id, duration_ms
394 )
395 .into(),
396 );
397 }
398 UnderlyingLayerCommand::TimerCancel { kind } => {
399 if let Some(timer_id) = self.active_timers.remove(&kind) {
400 web_sys::console::log_1(
401 &format!("Timer cancelled: {} (ID: {})", kind, timer_id).into(),
402 );
403 crate::platform::clear_timeout(timer_id);
404 } else {
405 web_sys::console::log_1(
406 &format!("Timer cancel requested but not active: {}", kind).into(),
407 );
408 }
409 }
410 }
411 }
412 }
413}