use crate::models::WebSocketMessage;
use crate::websocket::aio::WsStream;
use crate::websocket::connection_event::emit_event;
use crate::websocket::protocol::{handle_subscribed_event, parse_binary_frame, parse_text_frame};
use crate::websocket::{ConnectionEvent, SubscriptionManager};
use futures_util::StreamExt;
use std::sync::mpsc;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::mpsc as tokio_mpsc;
use tokio_tungstenite::tungstenite::Message;
pub(crate) async fn dispatch_messages(
mut ws_read: WsStream,
message_tx: tokio_mpsc::Sender<WebSocketMessage>,
event_tx: mpsc::SyncSender<ConnectionEvent>,
heartbeat_timeout: Option<Duration>,
subscriptions: Arc<SubscriptionManager>,
) -> Option<u16> {
loop {
let frame_result = match heartbeat_timeout {
Some(timeout) => match tokio::time::timeout(timeout, ws_read.next()).await {
Ok(opt) => opt,
Err(_elapsed) => {
emit_event(&event_tx, ConnectionEvent::HeartbeatTimeout {
elapsed: timeout,
});
return None;
}
},
None => ws_read.next().await,
};
let msg_result = match frame_result {
Some(r) => r,
None => {
emit_event(&event_tx, ConnectionEvent::Disconnected {
code: None,
reason: "Connection closed".to_string(),
});
return None;
}
};
match msg_result {
Ok(Message::Text(text)) => {
match parse_text_frame(&text) {
Ok(ws_msg) => {
handle_subscribed_event(&subscriptions, &ws_msg);
if message_tx.send(ws_msg).await.is_err() {
return None;
}
}
Err(e) => {
emit_event(&event_tx, ConnectionEvent::Error {
message: format!("Failed to deserialize message: {}", e),
code: 2003,
});
}
}
}
Ok(Message::Binary(data)) => {
match parse_binary_frame(&data) {
Ok(ws_msg) => {
handle_subscribed_event(&subscriptions, &ws_msg);
if message_tx.send(ws_msg).await.is_err() {
return None;
}
}
Err(e) => {
emit_event(&event_tx, ConnectionEvent::Error {
message: format!("Failed to deserialize binary message: {}", e),
code: 2003,
});
}
}
}
Ok(Message::Pong(_)) => {
}
Ok(Message::Close(close_frame)) => {
let code = close_frame.as_ref().map(|cf| cf.code.into());
let reason = close_frame
.as_ref()
.map(|cf| cf.reason.to_string())
.unwrap_or_else(|| "Server initiated close".to_string());
emit_event(&event_tx, ConnectionEvent::Disconnected {
code,
reason,
});
return code;
}
Ok(Message::Ping(_)) => {
}
Err(e) => {
emit_event(&event_tx, ConnectionEvent::Error {
message: format!("WebSocket error: {}", e),
code: 2001,
});
return None;
}
Ok(Message::Frame(_)) => {
}
}
}
}
#[allow(dead_code)] pub(crate) async fn send_pings(
mut ws_sink: crate::websocket::aio::WsSink,
ping_rx: mpsc::Receiver<()>,
) {
use futures_util::SinkExt;
while ping_rx.recv().is_ok() {
if ws_sink.send(Message::Ping(vec![].into())).await.is_err() {
break;
}
}
}