#[cfg(target_arch = "wasm32")]
use crate::log;
#[cfg(target_arch = "wasm32")]
use crate::websocket::BrowserWebSocket;
use crate::{error::*, types::*, websocket::*};
use futures::channel::{mpsc, oneshot};
use futures::stream::StreamExt;
use futures::{select, FutureExt};
use mqtt_protocol_core::mqtt;
use mqtt_protocol_core::mqtt::prelude::*;
use std::collections::HashSet;
#[derive(Debug)]
pub enum Request {
Connect {
url: String,
reply: oneshot::Sender<Result<()>>,
},
Send {
packet: mqtt::packet::Packet,
reply: oneshot::Sender<Result<()>>,
},
Recv {
reply: oneshot::Sender<Result<mqtt::packet::Packet>>,
},
Close { reply: oneshot::Sender<Result<()>> },
State {
reply: oneshot::Sender<ConnectionState>,
},
IsConnected { reply: oneshot::Sender<bool> },
AcquirePacketId { reply: oneshot::Sender<Option<u16>> },
RegisterPacketId {
packet_id: u16,
reply: oneshot::Sender<bool>,
},
ReleasePacketId {
packet_id: u16,
reply: oneshot::Sender<Result<()>>,
},
}
pub struct MqttClient {
request_sender: mpsc::UnboundedSender<Request>,
}
struct MqttProcessor {
config: MqttConfig,
state: ConnectionState,
mqtt_connection: mqtt::Connection<mqtt::role::Client>,
read_buffer: Vec<u8>,
buffer_size: usize,
consumed_bytes: usize,
active_timers: HashSet<String>,
packet_sender: mpsc::UnboundedSender<mqtt::packet::Packet>,
packet_receiver: mpsc::UnboundedReceiver<mqtt::packet::Packet>,
pending_recv_requests: Vec<oneshot::Sender<Result<mqtt::packet::Packet>>>,
undelivered_packet: Option<mqtt::packet::Packet>,
websocket_events: mpsc::UnboundedReceiver<UnderlyingLayerEvent>,
websocket_commands: mpsc::UnboundedSender<UnderlyingLayerCommand>,
#[allow(dead_code)]
request_sender: mpsc::UnboundedSender<Request>,
}
impl MqttProcessor {
fn new<W: UnderlyingLayerInterface>(
config: MqttConfig,
mut websocket: W,
request_sender: mpsc::UnboundedSender<Request>,
) -> (Self, W) {
let mqtt_connection = mqtt::Connection::<mqtt::role::Client>::new(config.version);
let (packet_sender, packet_receiver) = mpsc::unbounded();
let websocket_events = websocket.event_receiver();
let websocket_commands = websocket.command_sender();
let processor = Self {
config,
state: ConnectionState::Disconnected,
mqtt_connection,
read_buffer: Vec::with_capacity(8192),
buffer_size: 0,
consumed_bytes: 0,
active_timers: HashSet::new(),
packet_sender,
packet_receiver,
pending_recv_requests: Vec::new(),
undelivered_packet: None,
websocket_events,
websocket_commands,
request_sender,
};
(processor, websocket)
}
async fn run(&mut self, mut request_receiver: mpsc::UnboundedReceiver<Request>) {
#[cfg(target_arch = "wasm32")]
web_sys::console::log_1(
&"★★★ MQTT: Starting MQTT processor main loop (NEW VERSION) ★★★".into(),
);
#[cfg(target_arch = "wasm32")]
web_sys::console::log_1(
&format!(
"Config pingreq_send_interval_ms: {:?}",
self.config.pingreq_send_interval_ms
)
.into(),
);
if let Some(interval) = self.config.pingreq_send_interval_ms {
self.mqtt_connection
.set_pingreq_send_interval(Some(interval));
#[cfg(target_arch = "wasm32")]
web_sys::console::log_1(&format!("Set pingreq_send_interval to {}ms", interval).into());
} else {
#[cfg(target_arch = "wasm32")]
web_sys::console::log_1(
&"pingreq_send_interval will be auto-configured from CONNECT keep_alive".into(),
);
}
self.mqtt_connection
.set_auto_pub_response(self.config.auto_pub_response);
self.mqtt_connection
.set_auto_ping_response(self.config.auto_ping_response);
self.mqtt_connection
.set_auto_map_topic_alias_send(self.config.auto_map_topic_alias_send);
self.mqtt_connection
.set_auto_replace_topic_alias_send(self.config.auto_replace_topic_alias_send);
self.mqtt_connection
.set_pingresp_recv_timeout(self.config.pingresp_recv_timeout_ms);
#[cfg(target_arch = "wasm32")]
web_sys::console::log_1(&"MQTT connection configured".into());
loop {
select! {
request = request_receiver.next().fuse() => {
#[cfg(target_arch = "wasm32")]
web_sys::console::log_1(&"Received API request".into());
match request {
Some(req) => {
if !self.handle_request(req).await {
break; }
}
None => break, }
}
event = self.websocket_events.next().fuse() => {
#[cfg(target_arch = "wasm32")]
web_sys::console::log_1(&"Received WebSocket event".into());
if let Some(event) = event {
self.handle_websocket_event(event).await;
} else {
#[cfg(target_arch = "wasm32")]
web_sys::console::log_1(&"WebSocket event stream ended".into());
break;
}
}
packet = self.packet_receiver.next().fuse() => {
#[cfg(target_arch = "wasm32")]
web_sys::console::log_1(&"Received packet from MQTT connection".into());
if let Some(packet) = packet {
self.handle_received_packet(packet);
}
}
}
}
}
async fn handle_request(&mut self, request: Request) -> bool {
match request {
Request::Connect { url, reply } => {
let _ = self.connect(&url, reply).await;
}
Request::Send { packet, reply } => {
let result = self.send_packet(packet).await;
let _ = reply.send(result);
}
Request::Recv { reply } => {
if let Some(packet) = self.undelivered_packet.take() {
let _ = reply.send(Ok(packet));
} else {
self.pending_recv_requests.push(reply);
}
}
Request::Close { reply } => {
let result = self.close().await;
let _ = reply.send(result);
}
Request::State { reply } => {
let _ = reply.send(self.state);
}
Request::IsConnected { reply } => {
let _ = reply.send(matches!(self.state, ConnectionState::Connected));
}
Request::AcquirePacketId { reply } => {
let packet_id = self.mqtt_connection.acquire_packet_id().ok();
let _ = reply.send(packet_id);
}
Request::RegisterPacketId { packet_id, reply } => {
let result = self.mqtt_connection.register_packet_id(packet_id).is_ok();
let _ = reply.send(result);
}
Request::ReleasePacketId { packet_id, reply } => {
let events = self.mqtt_connection.release_packet_id(packet_id);
let _ = self.handle_mqtt_events(events);
let _ = reply.send(Ok(()));
}
}
true
}
async fn handle_websocket_event(&mut self, event: UnderlyingLayerEvent) {
#[cfg(target_arch = "wasm32")]
web_sys::console::log_1(&format!("Handling WebSocket event: {:?}", event).into());
match event {
UnderlyingLayerEvent::Connected => {
#[cfg(target_arch = "wasm32")]
web_sys::console::log_1(&"WebSocket Connected event - updating state".into());
self.state = ConnectionState::Connected;
}
UnderlyingLayerEvent::Message(data) => {
#[cfg(target_arch = "wasm32")]
web_sys::console::log_1(
&format!(
"🔥 NEW VERSION: WebSocket Message event - processing {} bytes",
data.len()
)
.into(),
);
self.process_incoming_data(data);
}
UnderlyingLayerEvent::Error(_error) => {
#[cfg(target_arch = "wasm32")]
web_sys::console::log_1(&format!("WebSocket Error event: {}", _error).into());
self.state = ConnectionState::Disconnected;
}
UnderlyingLayerEvent::Closed => {
#[cfg(target_arch = "wasm32")]
web_sys::console::log_1(&"WebSocket Closed event - updating state".into());
self.state = ConnectionState::Closed;
let events = self.mqtt_connection.notify_closed();
let _ = self.handle_mqtt_events(events);
self.active_timers.clear();
#[cfg(target_arch = "wasm32")]
web_sys::console::log_1(&"All timers cleared on connection close".into());
}
UnderlyingLayerEvent::TimerExpired(timer_kind) => {
#[cfg(target_arch = "wasm32")]
log!("Timer expired event received: {}", timer_kind);
#[cfg(not(target_arch = "wasm32"))]
println!("Timer expired event received: {}", timer_kind);
if matches!(self.state, ConnectionState::Closed) {
#[cfg(target_arch = "wasm32")]
log!("Connection closed, ignoring timer: {}", timer_kind);
#[cfg(not(target_arch = "wasm32"))]
println!("Connection closed, ignoring timer: {}", timer_kind);
return;
}
if !self.active_timers.contains(&timer_kind) {
#[cfg(target_arch = "wasm32")]
log!("Timer {} was cancelled, ignoring expiration", timer_kind);
#[cfg(not(target_arch = "wasm32"))]
println!("Timer {} was cancelled, ignoring expiration", timer_kind);
return;
}
self.active_timers.remove(&timer_kind);
#[cfg(target_arch = "wasm32")]
log!(
"Timer {} expired and removed from active timers",
timer_kind
);
#[cfg(not(target_arch = "wasm32"))]
println!(
"Timer {} expired and removed from active timers",
timer_kind
);
if timer_kind.contains("PingreqSend") {
let events = self
.mqtt_connection
.notify_timer_fired(mqtt::connection::TimerKind::PingreqSend);
let _ = self.handle_mqtt_events(events);
} else if timer_kind.contains("PingrespRecv") {
let events = self
.mqtt_connection
.notify_timer_fired(mqtt::connection::TimerKind::PingrespRecv);
let _ = self.handle_mqtt_events(events);
} else {
#[cfg(target_arch = "wasm32")]
log!("Unknown timer kind: {}", timer_kind);
#[cfg(not(target_arch = "wasm32"))]
println!("Unknown timer kind: {}", timer_kind);
}
}
}
}
fn handle_received_packet(&mut self, packet: mqtt::packet::Packet) {
let mut packet_to_deliver = Some(packet);
while let Some(pkt) = packet_to_deliver.take() {
if self.pending_recv_requests.is_empty() {
self.undelivered_packet = Some(pkt);
break;
}
let reply = self.pending_recv_requests.remove(0);
match reply.send(Ok(pkt)) {
Ok(()) => {
break;
}
Err(packet_result) => {
if let Ok(returned_packet) = packet_result {
packet_to_deliver = Some(returned_packet);
}
}
}
}
}
async fn connect(&mut self, url: &str, reply: oneshot::Sender<Result<()>>) -> Result<()> {
if self.state == ConnectionState::Connecting || self.state == ConnectionState::Connected {
let _ = reply.send(Err(Error::Other(
"Already connecting or connected".to_string(),
)));
return Ok(());
}
if self.state == ConnectionState::Closed {
self.reset_for_reconnection();
}
self.state = ConnectionState::Connecting;
let reply_arc = std::sync::Arc::new(std::sync::Mutex::new(Some(reply)));
#[cfg(target_arch = "wasm32")]
web_sys::console::log_1(&"Sending Connect command with reply_arc".into());
let _ = self
.websocket_commands
.unbounded_send(UnderlyingLayerCommand::Connect(url.to_string(), reply_arc));
Ok(())
}
async fn send_packet(&mut self, packet: mqtt::packet::Packet) -> Result<()> {
#[cfg(target_arch = "wasm32")]
web_sys::console::log_1(&format!("Sending MQTT packet: {:?}", packet).into());
let events = self.mqtt_connection.send(packet);
#[cfg(target_arch = "wasm32")]
web_sys::console::log_1(&format!("MQTT send returned {} events", events.len()).into());
self.handle_mqtt_events(events)
}
async fn close(&mut self) -> Result<()> {
let _ = self
.websocket_commands
.unbounded_send(UnderlyingLayerCommand::Close);
self.state = ConnectionState::Closed;
Ok(())
}
fn process_incoming_data(&mut self, data: Vec<u8>) {
let new_data_len = data.len();
if self.consumed_bytes > 0 {
let unconsumed_len = self.buffer_size - self.consumed_bytes;
if unconsumed_len > 0 {
self.read_buffer
.copy_within(self.consumed_bytes..self.buffer_size, 0);
}
self.buffer_size = unconsumed_len;
self.consumed_bytes = 0;
}
self.read_buffer.resize(self.buffer_size + new_data_len, 0);
self.read_buffer[self.buffer_size..self.buffer_size + new_data_len].copy_from_slice(&data);
self.buffer_size += new_data_len;
if self.consumed_bytes < self.buffer_size {
let unconsumed_data = &self.read_buffer[self.consumed_bytes..self.buffer_size];
let mut cursor = mqtt::common::Cursor::new(unconsumed_data);
let events = self.mqtt_connection.recv(&mut cursor);
self.consumed_bytes += cursor.position() as usize;
let _ = self.handle_mqtt_events(events);
}
}
#[allow(unused_variables)]
fn handle_mqtt_events(&mut self, events: Vec<mqtt::connection::Event>) -> Result<()> {
#[cfg(target_arch = "wasm32")]
web_sys::console::log_1(
&format!(
"🚀🚀🚀 FORCE UPDATE: Processing {} MQTT events 🚀🚀🚀",
events.len()
)
.into(),
);
for (i, event) in events.iter().enumerate() {
#[cfg(target_arch = "wasm32")]
web_sys::console::log_1(&format!("Event {}: {:?}", i + 1, event).into());
}
for event in events {
match event {
mqtt::connection::Event::RequestSendPacket { packet, .. } => {
let buffer = packet.to_continuous_buffer();
#[cfg(target_arch = "wasm32")]
web_sys::console::log_1(
&format!("Sending packet: {} bytes", buffer.len()).into(),
);
match self
.websocket_commands
.unbounded_send(UnderlyingLayerCommand::SendData(buffer))
{
Ok(_) => {
#[cfg(target_arch = "wasm32")]
web_sys::console::log_1(
&"Sent packet successfully via WebSocket command".into(),
);
}
Err(e) => {
#[cfg(target_arch = "wasm32")]
web_sys::console::log_1(
&format!("Failed to send packet via WebSocket command: {:?}", e)
.into(),
);
}
}
}
mqtt::connection::Event::NotifyPacketReceived(packet) => {
if self.packet_sender.unbounded_send(packet).is_err() {
eprintln!("Failed to forward received packet");
}
}
mqtt::connection::Event::RequestTimerReset { kind, duration_ms } => {
let kind_str = format!("{:?}", kind);
self.active_timers.insert(kind_str.clone());
let _ = self.websocket_commands.unbounded_send(
UnderlyingLayerCommand::TimerReset {
kind: kind_str,
duration_ms,
},
);
}
mqtt::connection::Event::RequestTimerCancel(kind) => {
let kind_str = format!("{:?}", kind);
self.active_timers.remove(&kind_str);
let _ = self
.websocket_commands
.unbounded_send(UnderlyingLayerCommand::TimerCancel { kind: kind_str });
}
mqtt::connection::Event::NotifyError(error) => {
eprintln!("MQTT protocol error: {:?}", error);
}
mqtt::connection::Event::RequestClose => {
let _ = self
.websocket_commands
.unbounded_send(UnderlyingLayerCommand::Close);
self.state = ConnectionState::Closed;
}
_ => {}
}
}
Ok(())
}
fn reset_for_reconnection(&mut self) {
#[cfg(target_arch = "wasm32")]
log!("Resetting internal state for reconnection");
self.mqtt_connection = mqtt::Connection::<mqtt::role::Client>::new(self.config.version);
if let Some(interval) = self.config.pingreq_send_interval_ms {
self.mqtt_connection
.set_pingreq_send_interval(Some(interval));
}
self.mqtt_connection
.set_auto_pub_response(self.config.auto_pub_response);
self.mqtt_connection
.set_auto_ping_response(self.config.auto_ping_response);
self.mqtt_connection
.set_auto_map_topic_alias_send(self.config.auto_map_topic_alias_send);
self.mqtt_connection
.set_auto_replace_topic_alias_send(self.config.auto_replace_topic_alias_send);
self.mqtt_connection
.set_pingresp_recv_timeout(self.config.pingresp_recv_timeout_ms);
self.read_buffer.clear();
self.buffer_size = 0;
self.consumed_bytes = 0;
self.pending_recv_requests.clear();
self.undelivered_packet = None;
self.active_timers.clear();
#[cfg(target_arch = "wasm32")]
log!("Internal state reset complete for reconnection");
}
}
impl MqttClient {
#[cfg(target_arch = "wasm32")]
pub fn new(config: MqttConfig) -> Self {
Self::new_with_websocket(config, BrowserWebSocket::new())
}
#[cfg(target_arch = "wasm32")]
pub fn new_with_websocket<W: UnderlyingLayerInterface + 'static>(
config: MqttConfig,
websocket: W,
) -> Self {
let (request_sender, request_receiver) = mpsc::unbounded();
let (mut processor, mut websocket) =
MqttProcessor::new(config, websocket, request_sender.clone());
use wasm_bindgen_futures::spawn_local;
#[cfg(target_arch = "wasm32")]
web_sys::console::log_1(&"Starting WebSocket processor task".into());
spawn_local(async move {
websocket.run().await;
});
#[cfg(target_arch = "wasm32")]
web_sys::console::log_1(&"Starting MQTT processor task".into());
spawn_local(async move {
processor.run(request_receiver).await;
});
#[cfg(target_arch = "wasm32")]
web_sys::console::log_1(
&"★★★ CLIENT_CLEAN: Both processors started, returning client ★★★".into(),
);
Self { request_sender }
}
#[cfg(not(target_arch = "wasm32"))]
pub fn new_with_websocket<W: UnderlyingLayerInterface + Send + 'static>(
config: MqttConfig,
websocket: W,
) -> Self {
let (request_sender, request_receiver) = mpsc::unbounded();
let (mut processor, mut websocket) =
MqttProcessor::new(config, websocket, request_sender.clone());
std::thread::spawn(move || {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();
rt.block_on(websocket.run());
});
std::thread::spawn(move || {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();
rt.block_on(processor.run(request_receiver));
});
Self { request_sender }
}
pub async fn connect(&self, url: &str) -> Result<()> {
let (reply_sender, reply_receiver) = oneshot::channel();
let request = Request::Connect {
url: url.to_string(),
reply: reply_sender,
};
self.request_sender
.unbounded_send(request)
.map_err(|_| Error::Other("Client channel closed".to_string()))?;
reply_receiver
.await
.map_err(|_| Error::Other("Request cancelled".to_string()))?
}
pub async fn send(&self, packet: mqtt::packet::Packet) -> Result<()> {
let (reply_sender, reply_receiver) = oneshot::channel();
let request = Request::Send {
packet,
reply: reply_sender,
};
self.request_sender
.unbounded_send(request)
.map_err(|_| Error::Other("Client channel closed".to_string()))?;
reply_receiver
.await
.map_err(|_| Error::Other("Request cancelled".to_string()))?
}
pub async fn recv(&self) -> Result<mqtt::packet::Packet> {
let (reply_sender, reply_receiver) = oneshot::channel();
let request = Request::Recv {
reply: reply_sender,
};
self.request_sender
.unbounded_send(request)
.map_err(|_| Error::Other("Client channel closed".to_string()))?;
reply_receiver
.await
.map_err(|_| Error::Other("Request cancelled".to_string()))?
}
pub async fn close(&self) -> Result<()> {
let (reply_sender, reply_receiver) = oneshot::channel();
let request = Request::Close {
reply: reply_sender,
};
self.request_sender
.unbounded_send(request)
.map_err(|_| Error::Other("Client channel closed".to_string()))?;
reply_receiver
.await
.map_err(|_| Error::Other("Request cancelled".to_string()))?
}
pub async fn state(&self) -> ConnectionState {
let (reply_sender, reply_receiver) = oneshot::channel();
let request = Request::State {
reply: reply_sender,
};
if self.request_sender.unbounded_send(request).is_err() {
return ConnectionState::Closed;
}
reply_receiver.await.unwrap_or(ConnectionState::Closed)
}
pub async fn is_connected(&self) -> bool {
let (reply_sender, reply_receiver) = oneshot::channel();
let request = Request::IsConnected {
reply: reply_sender,
};
if self.request_sender.unbounded_send(request).is_err() {
return false;
}
reply_receiver.await.unwrap_or(false)
}
pub async fn acquire_packet_id(&self) -> Option<u16> {
let (reply_sender, reply_receiver) = oneshot::channel();
let request = Request::AcquirePacketId {
reply: reply_sender,
};
if self.request_sender.unbounded_send(request).is_err() {
return None;
}
reply_receiver.await.unwrap_or(None)
}
pub async fn register_packet_id(&self, packet_id: u16) -> bool {
let (reply_sender, reply_receiver) = oneshot::channel();
let request = Request::RegisterPacketId {
packet_id,
reply: reply_sender,
};
if self.request_sender.unbounded_send(request).is_err() {
return false;
}
reply_receiver.await.unwrap_or(false)
}
pub async fn release_packet_id(&self, packet_id: u16) -> Result<()> {
let (reply_sender, reply_receiver) = oneshot::channel();
let request = Request::ReleasePacketId {
packet_id,
reply: reply_sender,
};
self.request_sender
.unbounded_send(request)
.map_err(|_| Error::Other("Client channel closed".to_string()))?;
reply_receiver
.await
.map_err(|_| Error::Other("Request cancelled".to_string()))?
}
}