Skip to main content

mqtt_client_wasm/
websocket.rs

1//! Underlying layer abstraction for testability
2//!
3//! This module provides an abstraction over the underlying transport layer
4//! (WebSocket, TCP, TLS, etc.) for testability and flexibility.
5
6use async_trait::async_trait;
7use futures::channel::{mpsc, oneshot};
8
9/// Type alias for the connect reply sender
10pub type ConnectReplySender =
11    std::sync::Arc<std::sync::Mutex<Option<oneshot::Sender<Result<(), crate::error::Error>>>>>;
12
13/// Underlying layer events (sent FROM transport TO message loop)
14#[derive(Debug, Clone)]
15pub enum UnderlyingLayerEvent {
16    Connected,
17    Message(Vec<u8>),
18    Error(String),
19    Closed,
20    /// Timer expired event
21    /// The String is the timer kind (e.g., "PingreqSend")
22    TimerExpired(String),
23}
24
25/// Underlying layer commands (sent TO transport FROM message loop)
26#[derive(Debug, Clone)]
27pub enum UnderlyingLayerCommand {
28    Connect(String, ConnectReplySender),
29    SendData(Vec<u8>),
30    Close,
31    /// Start or reset a timer
32    /// When the timer expires, the underlying layer should send TimerExpired event
33    TimerReset {
34        kind: String,
35        duration_ms: u64,
36    },
37    /// Cancel a timer
38    TimerCancel {
39        kind: String,
40    },
41}
42
43/// Abstract underlying layer interface for testing (pure message-passing)
44#[async_trait(?Send)]
45#[cfg(target_arch = "wasm32")]
46pub trait UnderlyingLayerInterface {
47    /// Get event receiver (events FROM transport TO message loop)
48    fn event_receiver(&mut self) -> mpsc::UnboundedReceiver<UnderlyingLayerEvent>;
49
50    /// Get command sender (commands TO transport FROM message loop)
51    fn command_sender(&self) -> mpsc::UnboundedSender<UnderlyingLayerCommand>;
52
53    /// Start the transport processor (handles commands and generates events)
54    async fn run(&mut self);
55}
56
57/// Abstract underlying layer interface for testing (pure message-passing)
58#[async_trait(?Send)]
59#[cfg(not(target_arch = "wasm32"))]
60pub trait UnderlyingLayerInterface: Send {
61    /// Get event receiver (events FROM transport TO message loop)
62    fn event_receiver(&mut self) -> mpsc::UnboundedReceiver<UnderlyingLayerEvent>;
63
64    /// Get command sender (commands TO transport FROM message loop)
65    fn command_sender(&self) -> mpsc::UnboundedSender<UnderlyingLayerCommand>;
66
67    /// Start the transport processor (handles commands and generates events)
68    async fn run(&mut self);
69}
70
71/// Browser WebSocket implementation (pure message-passing)
72#[cfg(target_arch = "wasm32")]
73pub struct BrowserWebSocket {
74    event_sender: mpsc::UnboundedSender<UnderlyingLayerEvent>,
75    event_receiver: Option<mpsc::UnboundedReceiver<UnderlyingLayerEvent>>,
76    command_sender: mpsc::UnboundedSender<UnderlyingLayerCommand>,
77    command_receiver: mpsc::UnboundedReceiver<UnderlyingLayerCommand>,
78    /// Active timers: kind -> timer_id
79    active_timers: std::collections::HashMap<String, i32>,
80}
81
82#[cfg(target_arch = "wasm32")]
83impl BrowserWebSocket {
84    pub fn new() -> Self {
85        let (event_sender, event_receiver) = mpsc::unbounded();
86        let (command_sender, command_receiver) = mpsc::unbounded();
87
88        Self {
89            event_sender,
90            event_receiver: Some(event_receiver),
91            command_sender,
92            command_receiver,
93            active_timers: std::collections::HashMap::new(),
94        }
95    }
96}
97
98#[cfg(target_arch = "wasm32")]
99#[async_trait(?Send)]
100impl UnderlyingLayerInterface for BrowserWebSocket {
101    fn event_receiver(&mut self) -> mpsc::UnboundedReceiver<UnderlyingLayerEvent> {
102        self.event_receiver.take().unwrap_or_else(|| {
103            let (_, receiver) = mpsc::unbounded();
104            receiver
105        })
106    }
107
108    fn command_sender(&self) -> mpsc::UnboundedSender<UnderlyingLayerCommand> {
109        self.command_sender.clone()
110    }
111
112    async fn run(&mut self) {
113        use futures::stream::StreamExt;
114        use wasm_bindgen::prelude::*;
115        use wasm_bindgen::JsCast;
116        use web_sys::{BinaryType, ErrorEvent, MessageEvent, WebSocket};
117
118        web_sys::console::log_1(&"🚀 WEBSOCKET: NEW VERSION - CONNECT TIMING FIXED 🚀".into());
119
120        let mut websocket: Option<web_sys::WebSocket> = None;
121        let mut _closures: Vec<wasm_bindgen::closure::Closure<dyn FnMut(wasm_bindgen::JsValue)>> =
122            Vec::new();
123        let _is_connected = false;
124        let _pending_data: Vec<Vec<u8>> = Vec::new();
125
126        web_sys::console::log_1(&"WebSocket processor waiting for commands".into());
127
128        while let Some(command) = self.command_receiver.next().await {
129            web_sys::console::log_1(
130                &format!("WebSocket processor received command: {:?}", command).into(),
131            );
132            match command {
133                UnderlyingLayerCommand::Connect(url, reply_arc) => {
134                    web_sys::console::log_1(&format!("WebSocket connecting to: {}", url).into());
135                    web_sys::console::log_1(&"✅ Received Connect command with reply_arc".into());
136
137                    // MQTT subprotocol is required
138                    let protocols = js_sys::Array::new();
139                    protocols.push(&wasm_bindgen::JsValue::from_str("mqtt"));
140                    let ws_result = WebSocket::new_with_str_sequence(&url, &protocols);
141
142                    match ws_result {
143                        Ok(ws) => {
144                            web_sys::console::log_1(&"WebSocket created successfully".into());
145                            ws.set_binary_type(BinaryType::Arraybuffer);
146                            web_sys::console::log_1(&"Binary type set to ArrayBuffer".into());
147
148                            let event_sender = self.event_sender.clone();
149                            web_sys::console::log_1(&"Event sender cloned for closures".into());
150
151                            // onopen
152                            let event_sender_clone = event_sender.clone();
153                            let reply_arc_clone = reply_arc.clone();
154                            web_sys::console::log_1(&"Creating onopen closure".into());
155                            let onopen = Closure::wrap(Box::new(move |_: JsValue| {
156                                web_sys::console::log_1(
157                                    &"🔥 NEW WEBSOCKET: WebSocket onopen fired 🔥".into(),
158                                );
159
160                                // Send reply to complete the connect() await
161                                web_sys::console::log_1(
162                                    &"Attempting to lock reply_arc in onopen".into(),
163                                );
164                                match reply_arc_clone.lock() {
165                                    Ok(mut reply_opt) => {
166                                        web_sys::console::log_1(
167                                            &"Successfully locked reply_arc".into(),
168                                        );
169                                        if let Some(reply) = reply_opt.take() {
170                                            match reply.send(Ok(())) {
171                                                Ok(_) => web_sys::console::log_1(&"✅ Sent connect completion reply successfully".into()),
172                                                Err(_) => web_sys::console::log_1(&"❌ Failed to send connect completion reply - receiver dropped".into()),
173                                            }
174                                        } else {
175                                            web_sys::console::log_1(
176                                                &"❌ No reply sender in Option".into(),
177                                            );
178                                        }
179                                    }
180                                    Err(_) => {
181                                        web_sys::console::log_1(
182                                            &"❌ Failed to lock reply_arc".into(),
183                                        );
184                                    }
185                                }
186
187                                match event_sender_clone
188                                    .unbounded_send(UnderlyingLayerEvent::Connected)
189                                {
190                                    Ok(_) => web_sys::console::log_1(
191                                        &"Sent Connected event successfully".into(),
192                                    ),
193                                    Err(e) => web_sys::console::log_1(
194                                        &format!("Failed to send Connected event: {:?}", e).into(),
195                                    ),
196                                }
197                            })
198                                as Box<dyn FnMut(JsValue)>);
199                            web_sys::console::log_1(
200                                &"onopen closure created, setting on WebSocket".into(),
201                            );
202                            ws.set_onopen(Some(onopen.as_ref().unchecked_ref()));
203                            web_sys::console::log_1(
204                                &"onopen set on WebSocket, pushing to closures vec".into(),
205                            );
206                            _closures.push(onopen);
207                            web_sys::console::log_1(&"onopen closure pushed to vec".into());
208
209                            // onmessage
210                            let event_sender_clone = event_sender.clone();
211                            web_sys::console::log_1(&"Creating onmessage closure".into());
212                            let onmessage = Closure::wrap(Box::new(move |e: JsValue| {
213                                web_sys::console::log_1(&"WebSocket onmessage fired".into());
214                                let event: MessageEvent = e.dyn_into().unwrap();
215                                if let Ok(array_buffer) =
216                                    event.data().dyn_into::<js_sys::ArrayBuffer>()
217                                {
218                                    let uint8_array = js_sys::Uint8Array::new(&array_buffer);
219                                    let mut data = vec![0; uint8_array.length() as usize];
220                                    uint8_array.copy_to(&mut data);
221                                    web_sys::console::log_1(
222                                        &format!("Received {} bytes", data.len()).into(),
223                                    );
224                                    match event_sender_clone
225                                        .unbounded_send(UnderlyingLayerEvent::Message(data))
226                                    {
227                                        Ok(_) => web_sys::console::log_1(
228                                            &"Sent Message event successfully".into(),
229                                        ),
230                                        Err(e) => web_sys::console::log_1(
231                                            &format!("Failed to send Message event: {:?}", e)
232                                                .into(),
233                                        ),
234                                    }
235                                }
236                            })
237                                as Box<dyn FnMut(JsValue)>);
238                            web_sys::console::log_1(
239                                &"onmessage closure created, setting on WebSocket".into(),
240                            );
241                            ws.set_onmessage(Some(onmessage.as_ref().unchecked_ref()));
242                            web_sys::console::log_1(
243                                &"onmessage set on WebSocket, pushing to closures vec".into(),
244                            );
245                            _closures.push(onmessage);
246                            web_sys::console::log_1(&"onmessage closure pushed to vec".into());
247
248                            // onerror - with detailed logging
249                            let event_sender_clone = event_sender.clone();
250                            let onerror = Closure::wrap(Box::new(move |e: JsValue| {
251                                web_sys::console::log_1(&"WebSocket onerror fired".into());
252                                web_sys::console::log_1(&format!("Error event: {:?}", e).into());
253
254                                let error_msg = if let Ok(error_event) = e.dyn_into::<ErrorEvent>()
255                                {
256                                    let msg = error_event.message();
257                                    web_sys::console::log_1(
258                                        &format!("ErrorEvent message: {}", msg).into(),
259                                    );
260                                    msg
261                                } else {
262                                    web_sys::console::log_1(
263                                        &"Not an ErrorEvent - unknown error".into(),
264                                    );
265                                    "Unknown WebSocket error".to_string()
266                                };
267                                let _ = event_sender_clone
268                                    .unbounded_send(UnderlyingLayerEvent::Error(error_msg));
269                            })
270                                as Box<dyn FnMut(JsValue)>);
271                            ws.set_onerror(Some(onerror.as_ref().unchecked_ref()));
272                            _closures.push(onerror);
273
274                            // onclose
275                            let event_sender_clone = event_sender.clone();
276                            let onclose = Closure::wrap(Box::new(move |e: JsValue| {
277                                web_sys::console::log_1(&"WebSocket onclose fired".into());
278                                web_sys::console::log_1(&format!("Close event: {:?}", e).into());
279
280                                // Try to get close details
281                                if let Ok(close_event) = e.dyn_into::<web_sys::CloseEvent>() {
282                                    let code = close_event.code();
283                                    let reason = close_event.reason();
284                                    let was_clean = close_event.was_clean();
285                                    web_sys::console::log_1(
286                                        &format!(
287                                            "Close code: {}, reason: '{}', clean: {}",
288                                            code, reason, was_clean
289                                        )
290                                        .into(),
291                                    );
292                                }
293
294                                let _ =
295                                    event_sender_clone.unbounded_send(UnderlyingLayerEvent::Closed);
296                            })
297                                as Box<dyn FnMut(JsValue)>);
298                            ws.set_onclose(Some(onclose.as_ref().unchecked_ref()));
299                            _closures.push(onclose);
300
301                            websocket = Some(ws);
302                        }
303                        Err(e) => {
304                            let _ = self
305                                .event_sender
306                                .unbounded_send(UnderlyingLayerEvent::Error(format!(
307                                    "Failed to create WebSocket: {:?}",
308                                    e
309                                )));
310                        }
311                    }
312                }
313                UnderlyingLayerCommand::SendData(data) => {
314                    web_sys::console::log_1(
315                        &format!("WebSocket SendData command: {} bytes", data.len()).into(),
316                    );
317                    if let Some(ref ws) = websocket {
318                        web_sys::console::log_1(
319                            &"WebSocket is available, attempting to send".into(),
320                        );
321                        match ws.send_with_u8_array(&data) {
322                            Ok(_) => {
323                                web_sys::console::log_1(
324                                    &"WebSocket send_with_u8_array succeeded".into(),
325                                );
326                            }
327                            Err(e) => {
328                                web_sys::console::log_1(
329                                    &format!("WebSocket send_with_u8_array failed: {:?}", e).into(),
330                                );
331                                let _ =
332                                    self.event_sender
333                                        .unbounded_send(UnderlyingLayerEvent::Error(format!(
334                                            "Send failed: {:?}",
335                                            e
336                                        )));
337                            }
338                        }
339                    } else {
340                        web_sys::console::log_1(&"WebSocket not available for sending".into());
341                        let _ = self
342                            .event_sender
343                            .unbounded_send(UnderlyingLayerEvent::Error(
344                                "WebSocket not connected".to_string(),
345                            ));
346                    }
347                }
348                UnderlyingLayerCommand::Close => {
349                    // Clear closures first to prevent further callbacks
350                    _closures.clear();
351
352                    if let Some(ws) = websocket.take() {
353                        // Remove event handlers before closing
354                        ws.set_onopen(None);
355                        ws.set_onmessage(None);
356                        ws.set_onerror(None);
357                        ws.set_onclose(None);
358                        let _ = ws.close();
359                    }
360
361                    let _ = self
362                        .event_sender
363                        .unbounded_send(UnderlyingLayerEvent::Closed);
364                    // Do NOT break - allow reconnection by continuing to process commands
365                }
366                UnderlyingLayerCommand::TimerReset { kind, duration_ms } => {
367                    // Cancel existing timer if any
368                    if let Some(old_timer_id) = self.active_timers.remove(&kind) {
369                        web_sys::console::log_1(
370                            &format!("Cancelling existing timer {} (ID: {})", kind, old_timer_id)
371                                .into(),
372                        );
373                        crate::platform::clear_timeout(old_timer_id);
374                    }
375
376                    // Create new timer
377                    let event_sender = self.event_sender.clone();
378                    let timer_kind = kind.clone();
379                    let callback = wasm_bindgen::closure::Closure::wrap(Box::new(move || {
380                        web_sys::console::log_1(&format!("Timer expired: {}", timer_kind).into());
381                        let _ = event_sender
382                            .unbounded_send(UnderlyingLayerEvent::TimerExpired(timer_kind.clone()));
383                    })
384                        as Box<dyn Fn()>);
385
386                    let timer_id = crate::platform::set_timeout(&callback, duration_ms as i32);
387                    callback.forget();
388
389                    self.active_timers.insert(kind.clone(), timer_id);
390                    web_sys::console::log_1(
391                        &format!(
392                            "Timer set: {} (ID: {}) for {}ms",
393                            kind, timer_id, duration_ms
394                        )
395                        .into(),
396                    );
397                }
398                UnderlyingLayerCommand::TimerCancel { kind } => {
399                    if let Some(timer_id) = self.active_timers.remove(&kind) {
400                        web_sys::console::log_1(
401                            &format!("Timer cancelled: {} (ID: {})", kind, timer_id).into(),
402                        );
403                        crate::platform::clear_timeout(timer_id);
404                    } else {
405                        web_sys::console::log_1(
406                            &format!("Timer cancel requested but not active: {}", kind).into(),
407                        );
408                    }
409                }
410            }
411        }
412    }
413}