use crate::models::{
AuthRequest, SubscribeRequest, Symbols, UnsubscribeRequest, WebSocketMessage,
WebSocketRequest,
};
use crate::websocket::channels::{FutOptSubscription, StockSubscription};
use crate::websocket::SubscriptionManager;
use crate::MarketDataError;
pub(crate) enum AuthOutcome {
Authenticated,
Failed(String),
Pending,
}
pub(crate) fn frame_auth(auth: AuthRequest) -> Result<String, MarketDataError> {
let msg = WebSocketRequest::auth(auth);
serde_json::to_string(&msg).map_err(|e| MarketDataError::DeserializationError { source: e })
}
pub(crate) fn frame_subscribe(
sub: StockSubscription,
) -> Result<(String, Vec<SubscribeRequest>), MarketDataError> {
let mut wire_req = SubscribeRequest {
channel: sub.channel.as_str().to_string(),
..Default::default()
};
match &sub.symbols {
Symbols::Single(s) => wire_req.symbol = Some(s.clone()),
Symbols::Many(v) => wire_req.symbols = Some(v.clone()),
}
if sub.intraday_odd_lot {
wire_req.intraday_odd_lot = Some(true);
}
let expanded = wire_req.clone().expand();
let msg = WebSocketRequest::subscribe(wire_req);
let json = serde_json::to_string(&msg)
.map_err(|e| MarketDataError::DeserializationError { source: e })?;
Ok((json, expanded))
}
pub(crate) fn frame_subscribe_futopt(
sub: FutOptSubscription,
) -> Result<(String, Vec<SubscribeRequest>), MarketDataError> {
let mut wire_req = SubscribeRequest {
channel: sub.channel.as_str().to_string(),
..Default::default()
};
match &sub.symbols {
Symbols::Single(s) => wire_req.symbol = Some(s.clone()),
Symbols::Many(v) => wire_req.symbols = Some(v.clone()),
}
if sub.after_hours {
wire_req.after_hours = Some(true);
}
let expanded = wire_req.clone().expand();
let msg = WebSocketRequest::subscribe(wire_req);
let json = serde_json::to_string(&msg)
.map_err(|e| MarketDataError::DeserializationError { source: e })?;
Ok((json, expanded))
}
pub(crate) fn frame_subscribe_raw(req: SubscribeRequest) -> Result<String, MarketDataError> {
let msg = WebSocketRequest::subscribe(req);
serde_json::to_string(&msg).map_err(|e| MarketDataError::DeserializationError { source: e })
}
pub(crate) fn frame_unsubscribe(wire_ids: Vec<String>) -> Result<String, MarketDataError> {
let unsub_req = if wire_ids.len() == 1 {
UnsubscribeRequest::by_id(wire_ids.into_iter().next().unwrap())
} else {
UnsubscribeRequest::by_ids(wire_ids)
};
let msg = WebSocketRequest::unsubscribe(unsub_req);
serde_json::to_string(&msg).map_err(|e| MarketDataError::DeserializationError { source: e })
}
pub(crate) fn frame_request(req: &WebSocketRequest) -> Result<String, MarketDataError> {
serde_json::to_string(req).map_err(|e| MarketDataError::DeserializationError { source: e })
}
pub(crate) fn parse_text_frame(text: &str) -> Result<WebSocketMessage, MarketDataError> {
serde_json::from_str(text).map_err(|e| MarketDataError::DeserializationError { source: e })
}
pub(crate) fn parse_binary_frame(data: &[u8]) -> Result<WebSocketMessage, MarketDataError> {
serde_json::from_slice(data).map_err(|e| MarketDataError::DeserializationError { source: e })
}
pub(crate) fn classify_auth_response(msg: &WebSocketMessage) -> AuthOutcome {
if msg.is_authenticated() {
AuthOutcome::Authenticated
} else if msg.is_error() {
AuthOutcome::Failed(
msg.error_message().unwrap_or_else(|| "Unknown error".to_string()),
)
} else {
AuthOutcome::Pending
}
}
fn build_sub_key(channel: &str, symbol: &str, after_hours: bool, odd_lot: bool) -> String {
let base = format!("{}:{}", channel, symbol);
if after_hours {
format!("{base}:afterhours")
} else if odd_lot {
format!("{base}:oddlot")
} else {
base
}
}
pub(crate) fn handle_subscribed_event(
subscriptions: &SubscriptionManager,
msg: &WebSocketMessage,
) {
if msg.event != "subscribed" {
return;
}
if let Some(arr) = msg.data.as_ref().and_then(|d| d.as_array()) {
for entry in arr {
let Some(id) = entry.get("id").and_then(|v| v.as_str()) else {
continue;
};
let Some(channel) = entry.get("channel").and_then(|v| v.as_str()) else {
continue;
};
let Some(symbol) = entry.get("symbol").and_then(|v| v.as_str()) else {
continue;
};
let after_hours = entry
.get("afterHours")
.and_then(|v| v.as_bool())
.unwrap_or(false);
let odd_lot = entry
.get("intradayOddLot")
.and_then(|v| v.as_bool())
.unwrap_or(false);
subscriptions.record_server_id(
build_sub_key(channel, symbol, after_hours, odd_lot),
id.to_string(),
);
}
return;
}
let data_obj = msg.data.as_ref().and_then(|d| d.as_object());
let id = data_obj
.and_then(|d| d.get("id"))
.and_then(|v| v.as_str())
.map(String::from)
.or_else(|| msg.id.clone());
let channel = data_obj
.and_then(|d| d.get("channel"))
.and_then(|v| v.as_str())
.map(String::from)
.or_else(|| msg.channel.clone());
let symbol = data_obj
.and_then(|d| d.get("symbol"))
.and_then(|v| v.as_str())
.map(String::from)
.or_else(|| msg.symbol.clone());
let after_hours = data_obj
.and_then(|d| d.get("afterHours"))
.and_then(|v| v.as_bool())
.unwrap_or(false);
let odd_lot = data_obj
.and_then(|d| d.get("intradayOddLot"))
.and_then(|v| v.as_bool())
.unwrap_or(false);
if let (Some(id), Some(channel), Some(symbol)) = (id, channel, symbol) {
subscriptions.record_server_id(
build_sub_key(&channel, &symbol, after_hours, odd_lot),
id,
);
}
}
#[cfg(test)]
mod tests {
use super::*;
fn parse_msg(json: &str) -> WebSocketMessage {
serde_json::from_str(json).unwrap()
}
#[test]
fn test_handle_subscribed_ignores_non_subscribed() {
let manager = SubscriptionManager::new();
let msg = parse_msg(
r#"{"event":"data","id":"sub-1","channel":"trades","symbol":"2330"}"#,
);
handle_subscribed_event(&manager, &msg);
assert!(manager.take_server_id("trades:2330").is_none());
}
#[test]
fn test_handle_subscribed_single_top_level() {
let manager = SubscriptionManager::new();
let msg = parse_msg(
r#"{"event":"subscribed","id":"sub-abc","channel":"trades","symbol":"2330"}"#,
);
handle_subscribed_event(&manager, &msg);
assert_eq!(
manager.take_server_id("trades:2330"),
Some("sub-abc".to_string())
);
}
#[test]
fn test_handle_subscribed_single_with_after_hours() {
let manager = SubscriptionManager::new();
let msg = parse_msg(
r#"{"event":"subscribed","data":{"id":"sub-ah","channel":"books","symbol":"TXFE6","afterHours":true}}"#,
);
handle_subscribed_event(&manager, &msg);
assert_eq!(
manager.take_server_id("books:TXFE6:afterhours"),
Some("sub-ah".to_string())
);
assert!(manager.take_server_id("books:TXFE6").is_none());
}
#[test]
fn test_handle_subscribed_single_with_odd_lot() {
let manager = SubscriptionManager::new();
let msg = parse_msg(
r#"{"event":"subscribed","data":{"id":"sub-odd","channel":"trades","symbol":"2330","intradayOddLot":true}}"#,
);
handle_subscribed_event(&manager, &msg);
assert_eq!(
manager.take_server_id("trades:2330:oddlot"),
Some("sub-odd".to_string())
);
}
#[test]
fn test_handle_subscribed_batched_array() {
let manager = SubscriptionManager::new();
let msg = parse_msg(
r#"{"event":"subscribed","data":[
{"id":"sub-1","channel":"trades","symbol":"2330"},
{"id":"sub-2","channel":"books","symbol":"TXFE6","afterHours":true},
{"id":"sub-3","channel":"trades","symbol":"2317","intradayOddLot":true}
]}"#,
);
handle_subscribed_event(&manager, &msg);
assert_eq!(manager.take_server_id("trades:2330"), Some("sub-1".into()));
assert_eq!(
manager.take_server_id("books:TXFE6:afterhours"),
Some("sub-2".into())
);
assert_eq!(
manager.take_server_id("trades:2317:oddlot"),
Some("sub-3".into())
);
}
#[test]
fn test_handle_subscribed_missing_fields_no_op() {
let manager = SubscriptionManager::new();
let msg = parse_msg(r#"{"event":"subscribed","symbol":"2330"}"#);
handle_subscribed_event(&manager, &msg);
assert!(manager.take_server_id("trades:2330").is_none());
}
}