use crate::models::WebSocketMessage;
use crate::websocket::connection_event::emit_event;
use crate::websocket::protocol::{
classify_auth_response, frame_auth, frame_subscribe_raw, parse_binary_frame, parse_text_frame,
AuthOutcome,
};
use crate::websocket::{
ConnectionConfig, ConnectionEvent, ConnectionState, DisconnectIntent, HealthCheckConfig,
ReconnectionManager, SubscriptionManager,
};
use crate::MarketDataError;
use crate::tracing_compat::{debug, warn};
use std::io::ErrorKind;
use std::net::TcpStream;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{mpsc, Arc, Mutex, RwLock};
use std::time::{Duration, Instant};
use tungstenite::stream::MaybeTlsStream;
use tungstenite::{Connector, Message, WebSocket};
const READ_POLL_INTERVAL: Duration = Duration::from_millis(200);
const AUTH_TIMEOUT: Duration = Duration::from_secs(10);
pub(crate) const WRITE_QUEUE_CAPACITY: usize = 64;
const CLOSE_ACK_DEADLINE: Duration = Duration::from_secs(2);
pub(crate) type SyncWs = WebSocket<MaybeTlsStream<TcpStream>>;
fn drain_write_queue(ws: &mut SyncWs, write_rx: &mpsc::Receiver<String>) {
while let Ok(json) = write_rx.try_recv() {
if ws.send(Message::Text(json.into())).is_err() {
return;
}
}
}
fn await_close_ack(ws: &mut SyncWs, deadline: Duration) {
let stop_at = Instant::now() + deadline;
set_read_timeout(ws, Some(Duration::from_millis(50)));
while Instant::now() < stop_at {
match ws.read() {
Ok(Message::Close(_)) => return,
Err(tungstenite::Error::ConnectionClosed)
| Err(tungstenite::Error::AlreadyClosed) => return,
Err(tungstenite::Error::Io(e))
if e.kind() == ErrorKind::WouldBlock || e.kind() == ErrorKind::TimedOut =>
{
continue;
}
Err(_) => return,
Ok(_) => continue,
}
}
}
pub(crate) struct OwnerShared {
pub config: ConnectionConfig,
pub tls_config: Arc<rustls::ClientConfig>,
pub health: HealthCheckConfig,
pub reconnection: Mutex<ReconnectionManager>,
pub state: Arc<RwLock<ConnectionState>>,
pub subscriptions: Arc<SubscriptionManager>,
pub event_tx: mpsc::SyncSender<ConnectionEvent>,
pub message_tx: mpsc::SyncSender<WebSocketMessage>,
pub write_tx_slot: Mutex<Option<mpsc::SyncSender<String>>>,
pub should_stop: Arc<AtomicBool>,
pub messages_dropped: crate::metrics_compat::DropCounter,
pub events_dropped: crate::metrics_compat::DropCounter,
}
pub(crate) fn do_blocking_connect(
config: &ConnectionConfig,
tls_config: Arc<rustls::ClientConfig>,
) -> Result<SyncWs, MarketDataError> {
use std::net::ToSocketAddrs;
let url: url::Url = config.url.parse().map_err(|e: url::ParseError| {
MarketDataError::ConnectionError {
msg: format!("Invalid URL: {e}"),
}
})?;
let host = url.host_str().ok_or_else(|| MarketDataError::ConnectionError {
msg: "URL missing host".to_string(),
})?;
let port = url.port_or_known_default().ok_or_else(|| {
MarketDataError::ConnectionError {
msg: "URL missing port".to_string(),
}
})?;
let addrs: Vec<_> = (host, port)
.to_socket_addrs()
.map_err(|e| MarketDataError::ConnectionError {
msg: format!("DNS lookup failed: {e}"),
})?
.collect();
if addrs.is_empty() {
return Err(MarketDataError::ConnectionError {
msg: "DNS returned no addresses".to_string(),
});
}
let tcp = TcpStream::connect_timeout(&addrs[0], config.connect_timeout).map_err(|e| {
MarketDataError::ConnectionError {
msg: format!("TCP connect failed: {e}"),
}
})?;
tcp.set_nodelay(true).ok();
let connector = Connector::Rustls(tls_config);
let (ws, _resp) = tungstenite::client_tls_with_config(
config.url.as_str(),
tcp,
None,
Some(connector),
)
.map_err(|e| MarketDataError::ConnectionError {
msg: format!("WebSocket handshake failed: {e}"),
})?;
Ok(ws)
}
pub(crate) fn set_read_timeout(ws: &mut SyncWs, t: Option<Duration>) {
match ws.get_mut() {
MaybeTlsStream::Plain(s) => {
let _ = s.set_read_timeout(t);
}
MaybeTlsStream::Rustls(s) => {
let _ = s.sock.set_read_timeout(t);
}
_ => {
}
}
}
pub(crate) fn do_auth_handshake(
ws: &mut SyncWs,
config: &ConnectionConfig,
message_tx: &mpsc::SyncSender<WebSocketMessage>,
) -> Result<(), MarketDataError> {
let auth_json = frame_auth(config.auth.clone())?;
ws.send(Message::Text(auth_json.into())).map_err(|e| {
MarketDataError::ConnectionError {
msg: format!("Failed to send auth frame: {e}"),
}
})?;
set_read_timeout(ws, Some(AUTH_TIMEOUT));
let deadline = Instant::now() + AUTH_TIMEOUT;
loop {
if Instant::now() >= deadline {
return Err(MarketDataError::TimeoutError {
operation: "WebSocket authentication".to_string(),
});
}
match ws.read() {
Ok(Message::Text(text)) => {
let parsed = parse_text_frame(&text);
if let Ok(ws_msg) = parsed {
let _ = message_tx.try_send(ws_msg.clone());
match classify_auth_response(&ws_msg) {
AuthOutcome::Authenticated => return Ok(()),
AuthOutcome::Failed(msg) => {
return Err(MarketDataError::AuthError { msg });
}
AuthOutcome::Pending => continue,
}
}
}
Ok(Message::Close(_)) => {
return Err(MarketDataError::ConnectionError {
msg: "Stream closed during authentication".to_string(),
});
}
Ok(_) => continue,
Err(tungstenite::Error::Io(e))
if e.kind() == ErrorKind::WouldBlock || e.kind() == ErrorKind::TimedOut =>
{
continue;
}
Err(e) => {
return Err(MarketDataError::ConnectionError {
msg: format!("Auth read error: {e}"),
});
}
}
}
}
fn owner_loop(
mut ws: SyncWs,
write_rx: mpsc::Receiver<String>,
shared: &OwnerShared,
) -> Option<u16> {
set_read_timeout(&mut ws, Some(READ_POLL_INTERVAL));
let heartbeat_window = if shared.health.enabled {
Some(shared.health.heartbeat_timeout)
} else {
None
};
let mut last_activity = Instant::now();
loop {
if shared.should_stop.load(Ordering::SeqCst) {
drain_write_queue(&mut ws, &write_rx);
let _ = ws.close(None);
let _ = ws.flush();
await_close_ack(&mut ws, CLOSE_ACK_DEADLINE);
return Some(1000);
}
match ws.read() {
Ok(Message::Text(text)) => {
last_activity = Instant::now();
debug!(
target: "fugle_marketdata::ws",
bytes = text.len(),
kind = "text",
"ws frame received"
);
match parse_text_frame(&text) {
Ok(ws_msg) => {
crate::websocket::protocol::handle_subscribed_event(
&shared.subscriptions,
&ws_msg,
);
if let Err(mpsc::TrySendError::Full(_)) =
shared.message_tx.try_send(ws_msg)
{
shared.messages_dropped.bump();
warn!(
target: "fugle_marketdata::ws",
dropped_total = shared.messages_dropped.load(),
"message channel saturated; dropping frame (drop-newest)"
);
}
}
Err(e) => {
emit_event(&shared.event_tx, &shared.events_dropped, ConnectionEvent::Error {
message: format!("Failed to deserialize message: {e}"),
code: 2003,
});
}
}
}
Ok(Message::Binary(data)) => {
last_activity = Instant::now();
debug!(
target: "fugle_marketdata::ws",
bytes = data.len(),
kind = "binary",
"ws frame received"
);
match parse_binary_frame(&data) {
Ok(ws_msg) => {
crate::websocket::protocol::handle_subscribed_event(
&shared.subscriptions,
&ws_msg,
);
if let Err(mpsc::TrySendError::Full(_)) =
shared.message_tx.try_send(ws_msg)
{
shared.messages_dropped.bump();
warn!(
target: "fugle_marketdata::ws",
dropped_total = shared.messages_dropped.load(),
"message channel saturated; dropping frame (drop-newest)"
);
}
}
Err(e) => {
emit_event(&shared.event_tx, &shared.events_dropped, ConnectionEvent::Error {
message: format!("Failed to deserialize binary message: {e}"),
code: 2003,
});
}
}
}
Ok(Message::Ping(payload)) => {
last_activity = Instant::now();
let _ = ws.send(Message::Pong(payload));
}
Ok(Message::Pong(_)) => {
last_activity = Instant::now();
}
Ok(Message::Close(frame)) => {
let code = frame.as_ref().map(|cf| u16::from(cf.code));
let reason = frame
.as_ref()
.map(|cf| cf.reason.to_string())
.unwrap_or_else(|| "Server initiated close".to_string());
emit_event(&shared.event_tx, &shared.events_dropped, ConnectionEvent::Disconnected {
code,
reason,
intent: DisconnectIntent::Server,
});
let _ = ws.close(None);
return code;
}
Ok(Message::Frame(_)) => {
last_activity = Instant::now();
}
Err(tungstenite::Error::Io(e))
if e.kind() == ErrorKind::WouldBlock || e.kind() == ErrorKind::TimedOut =>
{
}
Err(tungstenite::Error::ConnectionClosed)
| Err(tungstenite::Error::AlreadyClosed) => {
if !shared.should_stop.load(Ordering::SeqCst) {
emit_event(&shared.event_tx, &shared.events_dropped, ConnectionEvent::Disconnected {
code: None,
reason: "Connection closed".to_string(),
intent: DisconnectIntent::Network,
});
}
return None;
}
Err(e) => {
let err_msg = format!("WebSocket read error: {e}");
emit_event(&shared.event_tx, &shared.events_dropped, ConnectionEvent::Error {
message: err_msg.clone(),
code: 2001,
});
if !shared.should_stop.load(Ordering::SeqCst) {
emit_event(&shared.event_tx, &shared.events_dropped, ConnectionEvent::Disconnected {
code: None,
reason: err_msg,
intent: DisconnectIntent::Network,
});
}
return None;
}
}
if let Some(window) = heartbeat_window {
if last_activity.elapsed() > window {
emit_event(&shared.event_tx, &shared.events_dropped, ConnectionEvent::HeartbeatTimeout {
elapsed: window,
});
return None;
}
}
loop {
match write_rx.try_recv() {
Ok(json) => {
if let Err(e) = ws.send(Message::Text(json.into())) {
emit_event(&shared.event_tx, &shared.events_dropped, ConnectionEvent::Error {
message: format!("WebSocket write error: {e}"),
code: 2002,
});
return None;
}
}
Err(mpsc::TryRecvError::Empty) => break,
Err(mpsc::TryRecvError::Disconnected) => {
let _ = ws.close(None);
return Some(1000);
}
}
}
}
}
fn reconnect_and_authenticate(
shared: &Arc<OwnerShared>,
) -> Result<(SyncWs, mpsc::Receiver<String>), MarketDataError> {
set_state(shared, ConnectionState::Connecting);
emit_event(&shared.event_tx, &shared.events_dropped, ConnectionEvent::Connecting {
});
let mut ws = do_blocking_connect(&shared.config, Arc::clone(&shared.tls_config))?;
crate::tracing_compat::info!(target: "fugle_marketdata::ws", "ws reconnected");
emit_event(&shared.event_tx, &shared.events_dropped, ConnectionEvent::Connected {
});
set_state(shared, ConnectionState::Authenticating);
do_auth_handshake(&mut ws, &shared.config, &shared.message_tx)?;
let (write_tx, write_rx) = mpsc::sync_channel::<String>(WRITE_QUEUE_CAPACITY);
*shared.write_tx_slot.lock().expect("write_tx_slot lock poisoned") = Some(write_tx.clone());
{
let mut mgr = shared.reconnection.lock().expect("reconnection lock poisoned");
mgr.reset();
}
shared.subscriptions.clear_server_ids();
for req in shared.subscriptions.get_all() {
if let Ok(json) = frame_subscribe_raw(req) {
let _ = write_tx.send(json);
}
}
set_state(shared, ConnectionState::Connected);
crate::tracing_compat::info!(target: "fugle_marketdata::ws", "ws re-authenticated");
emit_event(&shared.event_tx, &shared.events_dropped, ConnectionEvent::Authenticated {
});
Ok((ws, write_rx))
}
pub(crate) fn run_supervisor(
initial_ws: SyncWs,
initial_write_rx: mpsc::Receiver<String>,
shared: Arc<OwnerShared>,
) {
let mut connection = Some((initial_ws, initial_write_rx));
loop {
let (ws, write_rx) = match connection.take() {
Some(c) => c,
None => return,
};
let close_code = owner_loop(ws, write_rx, &shared);
if shared.should_stop.load(Ordering::SeqCst) {
set_state(&shared, ConnectionState::Closed {
code: Some(1000),
reason: "Client disconnected".to_string(),
intent: DisconnectIntent::Client,
});
return;
}
let should_reconnect = {
let mgr = shared.reconnection.lock().expect("reconnection lock poisoned");
mgr.should_reconnect(close_code)
};
if !should_reconnect {
let attempts = {
let mgr = shared.reconnection.lock().expect("reconnection lock poisoned");
mgr.current_attempt()
};
set_state(&shared, ConnectionState::Closed {
code: close_code,
reason: "Non-retriable error".to_string(),
intent: DisconnectIntent::Network,
});
emit_event(&shared.event_tx, &shared.events_dropped, ConnectionEvent::ReconnectFailed {
attempts,
});
return;
}
let new_conn = loop {
if shared.should_stop.load(Ordering::SeqCst) {
return;
}
let delay = {
let mut mgr = shared.reconnection.lock().expect("reconnection lock poisoned");
mgr.next_delay()
};
let Some(d) = delay else {
let attempts = {
let mgr = shared.reconnection.lock().expect("reconnection lock poisoned");
mgr.current_attempt()
};
set_state(&shared, ConnectionState::Closed {
code: close_code,
reason: "Max reconnection attempts reached".to_string(),
intent: DisconnectIntent::Network,
});
emit_event(&shared.event_tx, &shared.events_dropped, ConnectionEvent::ReconnectFailed {
attempts,
});
return;
};
let attempt = {
let mgr = shared.reconnection.lock().expect("reconnection lock poisoned");
mgr.current_attempt()
};
set_state(&shared, ConnectionState::Reconnecting { attempt });
warn!(
target: "fugle_marketdata::ws",
attempt,
delay_ms = d.as_millis() as u64,
"ws reconnect attempt"
);
emit_event(&shared.event_tx, &shared.events_dropped, ConnectionEvent::Reconnecting {
attempt,
});
std::thread::sleep(d);
match reconnect_and_authenticate(&shared) {
Ok(pair) => break Some(pair),
Err(e) => {
if let MarketDataError::AuthError { msg } = &e {
emit_event(&shared.event_tx, &shared.events_dropped, ConnectionEvent::Unauthenticated {
message: msg.clone(),
});
} else {
emit_event(&shared.event_tx, &shared.events_dropped, ConnectionEvent::Error {
message: e.to_string(),
code: e.to_error_code(),
});
}
continue;
}
}
};
connection = new_conn;
}
}
fn set_state(shared: &OwnerShared, new_state: ConnectionState) {
let mut st = shared.state.write().expect("state lock poisoned");
*st = new_state;
}
#[allow(dead_code)]
fn _atomic_bool_used(_: &AtomicBool) {}