use crate::models::WebSocketMessage;
use crate::tracing_compat::{debug, warn};
use crate::websocket::aio::WsStream;
use crate::websocket::connection_event::emit_event;
use crate::websocket::protocol::{handle_subscribed_event, parse_binary_frame, parse_text_frame};
use crate::websocket::{ConnectionEvent, DisconnectIntent, SubscriptionManager};
use futures_util::StreamExt;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::mpsc;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::mpsc as tokio_mpsc;
use tokio_tungstenite::tungstenite::Message;
#[allow(clippy::too_many_arguments)]
pub(crate) async fn dispatch_messages(
mut ws_read: WsStream,
message_tx: tokio_mpsc::Sender<WebSocketMessage>,
event_tx: mpsc::SyncSender<ConnectionEvent>,
events_dropped: Arc<AtomicU64>,
heartbeat_timeout: Option<Duration>,
subscriptions: Arc<SubscriptionManager>,
messages_dropped: Arc<AtomicU64>,
shutdown_requested: Arc<std::sync::atomic::AtomicBool>,
) -> Option<u16> {
loop {
let frame_result = match heartbeat_timeout {
Some(timeout) => match tokio::time::timeout(timeout, ws_read.next()).await {
Ok(opt) => opt,
Err(_elapsed) => {
warn!(
target: "fugle_marketdata::ws",
elapsed_ms = timeout.as_millis() as u64,
"heartbeat timeout: no inbound frame in window"
);
emit_event(&event_tx, &events_dropped, ConnectionEvent::HeartbeatTimeout {
elapsed: timeout,
});
return None;
}
},
None => ws_read.next().await,
};
let msg_result = match frame_result {
Some(r) => r,
None => {
if !shutdown_requested.load(std::sync::atomic::Ordering::SeqCst) {
emit_event(&event_tx, &events_dropped, ConnectionEvent::Disconnected {
code: None,
reason: "Connection closed".to_string(),
intent: DisconnectIntent::Network,
});
}
return None;
}
};
match msg_result {
Ok(Message::Text(text)) => {
debug!(
target: "fugle_marketdata::ws",
bytes = text.len(),
kind = "text",
"ws frame received"
);
match parse_text_frame(&text) {
Ok(ws_msg) => {
handle_subscribed_event(&subscriptions, &ws_msg);
if let Err(tokio_mpsc::error::TrySendError::Full(_)) =
message_tx.try_send(ws_msg)
{
messages_dropped.fetch_add(1, Ordering::Relaxed);
warn!(
target: "fugle_marketdata::ws",
dropped_total = messages_dropped.load(Ordering::Relaxed),
"message channel saturated; dropping frame (drop-newest)"
);
}
}
Err(e) => {
emit_event(&event_tx, &events_dropped, ConnectionEvent::Error {
message: format!("Failed to deserialize message: {}", e),
code: 2003,
});
}
}
}
Ok(Message::Binary(data)) => {
debug!(
target: "fugle_marketdata::ws",
bytes = data.len(),
kind = "binary",
"ws frame received"
);
match parse_binary_frame(&data) {
Ok(ws_msg) => {
handle_subscribed_event(&subscriptions, &ws_msg);
if let Err(tokio_mpsc::error::TrySendError::Full(_)) =
message_tx.try_send(ws_msg)
{
messages_dropped.fetch_add(1, Ordering::Relaxed);
warn!(
target: "fugle_marketdata::ws",
dropped_total = messages_dropped.load(Ordering::Relaxed),
"message channel saturated; dropping frame (drop-newest)"
);
}
}
Err(e) => {
emit_event(&event_tx, &events_dropped, ConnectionEvent::Error {
message: format!("Failed to deserialize binary message: {}", e),
code: 2003,
});
}
}
}
Ok(Message::Pong(_)) => {
}
Ok(Message::Close(close_frame)) => {
let code = close_frame.as_ref().map(|cf| cf.code.into());
let reason = close_frame
.as_ref()
.map(|cf| cf.reason.to_string())
.unwrap_or_else(|| "Server initiated close".to_string());
emit_event(&event_tx, &events_dropped, ConnectionEvent::Disconnected {
code,
reason,
intent: DisconnectIntent::Server,
});
return code;
}
Ok(Message::Ping(_)) => {
}
Err(e) => {
let err_msg = format!("WebSocket error: {}", e);
emit_event(&event_tx, &events_dropped, ConnectionEvent::Error {
message: err_msg.clone(),
code: 2001,
});
if !shutdown_requested.load(std::sync::atomic::Ordering::SeqCst) {
emit_event(&event_tx, &events_dropped, ConnectionEvent::Disconnected {
code: None,
reason: err_msg,
intent: DisconnectIntent::Network,
});
}
return None;
}
Ok(Message::Frame(_)) => {
}
}
}
}
#[allow(dead_code)] pub(crate) async fn send_pings(
mut ws_sink: crate::websocket::aio::WsSink,
ping_rx: mpsc::Receiver<()>,
) {
use futures_util::SinkExt;
while ping_rx.recv().is_ok() {
if ws_sink.send(Message::Ping(vec![].into())).await.is_err() {
break;
}
}
}