use std::{
collections::VecDeque,
sync::{
Arc,
atomic::{AtomicBool, AtomicU8, Ordering},
},
};
use ahash::AHashMap;
use dashmap::DashMap;
use nautilus_common::cache::quote::QuoteCache;
use nautilus_core::{UUID4, nanos::UnixNanos, time::get_atomic_clock_realtime};
use nautilus_model::{
data::{BarType, Data},
events::{OrderCancelRejected, OrderModifyRejected, OrderRejected},
identifiers::{AccountId, ClientOrderId, InstrumentId, StrategyId, TraderId, VenueOrderId},
instruments::{Instrument, InstrumentAny},
};
use nautilus_network::{
retry::{RetryManager, create_websocket_retry_manager},
websocket::{AuthTracker, SubscriptionState, WebSocketClient},
};
use tokio_tungstenite::tungstenite::Message;
use ustr::Ustr;
use super::{
enums::BybitWsOperation,
error::{BybitWsError, create_bybit_timeout_error, should_retry_bybit_error},
messages::{
BybitWebSocketError, BybitWsAuthResponse, BybitWsHeader, BybitWsMessage, BybitWsRequest,
BybitWsResponse, BybitWsSubscriptionMsg, NautilusWsMessage,
},
parse::{
parse_millis_i64, parse_orderbook_deltas, parse_orderbook_quote,
parse_ticker_linear_funding, parse_ticker_linear_index_price,
parse_ticker_linear_mark_price, parse_ticker_option_index_price,
parse_ticker_option_mark_price, parse_ws_account_state, parse_ws_fill_report,
parse_ws_kline_bar, parse_ws_order_status_report, parse_ws_position_status_report,
parse_ws_trade_tick,
},
};
use crate::{
common::{
consts::{
BYBIT_NAUTILUS_BROKER_ID, BYBIT_TOPIC_EXECUTION, BYBIT_TOPIC_KLINE, BYBIT_TOPIC_ORDER,
BYBIT_TOPIC_ORDERBOOK, BYBIT_TOPIC_POSITION, BYBIT_TOPIC_PUBLIC_TRADE,
BYBIT_TOPIC_TICKERS, BYBIT_TOPIC_TRADE, BYBIT_TOPIC_WALLET,
},
enums::{BybitProductType, BybitTimeInForce, BybitWsOrderRequestOp},
parse::{make_bybit_symbol, parse_price_with_precision, parse_quantity_with_precision},
},
websocket::messages::{
BybitBatchOrderError, BybitWsAmendOrderParams, BybitWsCancelOrderParams,
BybitWsOrderResponse, BybitWsPlaceOrderParams,
},
};
#[derive(Debug)]
#[allow(
clippy::large_enum_variant,
reason = "Commands are ephemeral and immediately consumed"
)]
pub enum HandlerCommand {
SetClient(WebSocketClient),
Disconnect,
Authenticate {
payload: String,
},
Subscribe {
topics: Vec<String>,
},
Unsubscribe {
topics: Vec<String>,
},
SendText {
payload: String,
},
PlaceOrder {
params: BybitWsPlaceOrderParams,
client_order_id: ClientOrderId,
trader_id: TraderId,
strategy_id: StrategyId,
instrument_id: InstrumentId,
},
AmendOrder {
params: BybitWsAmendOrderParams,
client_order_id: ClientOrderId,
trader_id: TraderId,
strategy_id: StrategyId,
instrument_id: InstrumentId,
venue_order_id: Option<VenueOrderId>,
},
CancelOrder {
params: BybitWsCancelOrderParams,
client_order_id: ClientOrderId,
trader_id: TraderId,
strategy_id: StrategyId,
instrument_id: InstrumentId,
venue_order_id: Option<VenueOrderId>,
},
RegisterBatchPlace {
req_id: String,
orders: Vec<BatchOrderData>,
},
RegisterBatchCancel {
req_id: String,
cancels: Vec<BatchCancelData>,
},
InitializeInstruments(Vec<InstrumentAny>),
UpdateInstrument(InstrumentAny),
}
type FundingCache = Arc<tokio::sync::RwLock<AHashMap<Ustr, (Option<String>, Option<String>)>>>;
type PlaceRequestData = (ClientOrderId, TraderId, StrategyId, InstrumentId);
type CancelRequestData = (
ClientOrderId,
TraderId,
StrategyId,
InstrumentId,
Option<VenueOrderId>,
);
type AmendRequestData = (
ClientOrderId,
TraderId,
StrategyId,
InstrumentId,
Option<VenueOrderId>,
);
type BatchOrderData = (ClientOrderId, PlaceRequestData);
type BatchCancelData = (ClientOrderId, CancelRequestData);
pub(super) struct FeedHandler {
signal: Arc<AtomicBool>,
client: Option<WebSocketClient>,
cmd_rx: tokio::sync::mpsc::UnboundedReceiver<HandlerCommand>,
raw_rx: tokio::sync::mpsc::UnboundedReceiver<Message>,
out_tx: tokio::sync::mpsc::UnboundedSender<NautilusWsMessage>,
auth_tracker: AuthTracker,
subscriptions: SubscriptionState,
instruments_cache: AHashMap<Ustr, InstrumentAny>,
account_id: Option<AccountId>,
mm_level: Arc<AtomicU8>,
product_type: Option<BybitProductType>,
bars_timestamp_on_close: bool,
quote_cache: QuoteCache,
funding_cache: FundingCache,
bar_types_cache: Arc<DashMap<String, BarType>>,
retry_manager: RetryManager<BybitWsError>,
pending_place_requests: DashMap<String, PlaceRequestData>,
pending_cancel_requests: DashMap<String, CancelRequestData>,
pending_amend_requests: DashMap<String, AmendRequestData>,
pending_batch_place_requests: DashMap<String, Vec<BatchOrderData>>,
pending_batch_cancel_requests: DashMap<String, Vec<BatchCancelData>>,
message_queue: VecDeque<NautilusWsMessage>,
}
impl FeedHandler {
#[allow(clippy::too_many_arguments)]
pub(super) fn new(
signal: Arc<AtomicBool>,
cmd_rx: tokio::sync::mpsc::UnboundedReceiver<HandlerCommand>,
raw_rx: tokio::sync::mpsc::UnboundedReceiver<Message>,
out_tx: tokio::sync::mpsc::UnboundedSender<NautilusWsMessage>,
account_id: Option<AccountId>,
product_type: Option<BybitProductType>,
bars_timestamp_on_close: bool,
mm_level: Arc<AtomicU8>,
auth_tracker: AuthTracker,
subscriptions: SubscriptionState,
funding_cache: FundingCache,
bar_types_cache: Arc<DashMap<String, BarType>>,
) -> Self {
Self {
signal,
client: None,
cmd_rx,
raw_rx,
out_tx,
auth_tracker,
subscriptions,
instruments_cache: AHashMap::new(),
account_id,
mm_level,
product_type,
bars_timestamp_on_close,
quote_cache: QuoteCache::new(),
funding_cache,
bar_types_cache,
retry_manager: create_websocket_retry_manager(),
pending_place_requests: DashMap::new(),
pending_cancel_requests: DashMap::new(),
pending_amend_requests: DashMap::new(),
pending_batch_place_requests: DashMap::new(),
pending_batch_cancel_requests: DashMap::new(),
message_queue: VecDeque::new(),
}
}
pub(super) fn is_stopped(&self) -> bool {
self.signal.load(Ordering::Relaxed)
}
#[allow(dead_code)]
pub(super) fn send(&self, msg: NautilusWsMessage) -> Result<(), ()> {
self.out_tx.send(msg).map_err(|_| ())
}
fn generate_unique_request_id(&self) -> String {
UUID4::new().to_string()
}
fn find_and_remove_place_request_by_client_order_id(
&self,
client_order_id: &ClientOrderId,
) -> Option<(String, PlaceRequestData)> {
self.pending_place_requests
.iter()
.find(|entry| entry.value().0 == *client_order_id)
.and_then(|entry| {
let key = entry.key().clone();
drop(entry);
self.pending_place_requests.remove(&key)
})
}
fn find_and_remove_cancel_request_by_client_order_id(
&self,
client_order_id: &ClientOrderId,
) -> Option<(String, CancelRequestData)> {
self.pending_cancel_requests
.iter()
.find(|entry| entry.value().0 == *client_order_id)
.and_then(|entry| {
let key = entry.key().clone();
drop(entry);
self.pending_cancel_requests.remove(&key)
})
}
fn find_and_remove_amend_request_by_client_order_id(
&self,
client_order_id: &ClientOrderId,
) -> Option<(String, AmendRequestData)> {
self.pending_amend_requests
.iter()
.find(|entry| entry.value().0 == *client_order_id)
.and_then(|entry| {
let key = entry.key().clone();
drop(entry);
self.pending_amend_requests.remove(&key)
})
}
fn include_referer_header(&self, time_in_force: Option<BybitTimeInForce>) -> bool {
let is_post_only = matches!(time_in_force, Some(BybitTimeInForce::PostOnly));
let mm_level = self.mm_level.load(Ordering::Relaxed);
!(is_post_only && mm_level > 0)
}
async fn send_with_retry(&self, payload: String) -> Result<(), BybitWsError> {
if let Some(client) = &self.client {
self.retry_manager
.execute_with_retry(
"websocket_send",
|| {
let payload = payload.clone();
async move {
client
.send_text(payload, None)
.await
.map_err(|e| BybitWsError::Transport(format!("Send failed: {e}")))
}
},
should_retry_bybit_error,
create_bybit_timeout_error,
)
.await
} else {
Err(BybitWsError::ClientError(
"No active WebSocket client".to_string(),
))
}
}
fn handle_batch_failure(
&self,
req_id: &str,
ret_msg: &str,
op: &str,
ts_init: UnixNanos,
result: &mut Vec<NautilusWsMessage>,
) {
if op.contains("create") {
if let Some((_, batch_data)) = self.pending_batch_place_requests.remove(req_id) {
log::warn!(
"Batch place request failed: req_id={req_id}, ret_msg={ret_msg}, num_orders={}",
batch_data.len()
);
let Some(account_id) = self.account_id else {
log::error!("Cannot create OrderRejected events: account_id is None");
return;
};
let reason = Ustr::from(ret_msg);
for (client_order_id, (_, trader_id, strategy_id, instrument_id)) in batch_data {
let rejected = OrderRejected::new(
trader_id,
strategy_id,
instrument_id,
client_order_id,
account_id,
reason,
UUID4::new(),
ts_init,
ts_init,
false,
false,
);
result.push(NautilusWsMessage::OrderRejected(rejected));
}
}
} else if op.contains("cancel")
&& let Some((_, batch_data)) = self.pending_batch_cancel_requests.remove(req_id)
{
log::warn!(
"Batch cancel request failed: req_id={req_id}, ret_msg={ret_msg}, num_cancels={}",
batch_data.len()
);
let reason = Ustr::from(ret_msg);
for (client_order_id, (_, trader_id, strategy_id, instrument_id, venue_order_id)) in
batch_data
{
let rejected = OrderCancelRejected::new(
trader_id,
strategy_id,
instrument_id,
client_order_id,
reason,
UUID4::new(),
ts_init,
ts_init,
false,
venue_order_id,
self.account_id,
);
result.push(NautilusWsMessage::OrderCancelRejected(rejected));
}
}
}
fn handle_batch_response(
&self,
resp: &BybitWsOrderResponse,
result: &mut Vec<NautilusWsMessage>,
) {
let Some(req_id) = &resp.req_id else {
log::warn!(
"Batch response missing req_id - cannot correlate with pending requests: op={}",
resp.op
);
return;
};
let batch_errors = resp.extract_batch_errors();
if resp.op.contains("create") {
if let Some((_, batch_data)) = self.pending_batch_place_requests.remove(req_id) {
self.process_batch_place_errors(batch_data, batch_errors, result);
} else {
log::debug!(
"Batch place response received but no pending request found: req_id={req_id}"
);
}
} else if resp.op.contains("cancel") {
if let Some((_, batch_data)) = self.pending_batch_cancel_requests.remove(req_id) {
self.process_batch_cancel_errors(batch_data, batch_errors, result);
} else {
log::debug!(
"Batch cancel response received but no pending request found: req_id={req_id}"
);
}
}
}
fn process_batch_place_errors(
&self,
batch_data: Vec<BatchOrderData>,
errors: Vec<BybitBatchOrderError>,
result: &mut Vec<NautilusWsMessage>,
) {
let Some(account_id) = self.account_id else {
log::error!("Cannot create OrderRejected events: account_id is None");
return;
};
let clock = get_atomic_clock_realtime();
let ts_init = clock.get_time_ns();
for (idx, (client_order_id, (_, trader_id, strategy_id, instrument_id))) in
batch_data.into_iter().enumerate()
{
if let Some(error) = errors.get(idx)
&& error.code != 0
{
log::warn!(
"Batch order rejected: client_order_id={client_order_id}, error_code={}, error_msg={}",
error.code,
error.msg
);
let rejected = OrderRejected::new(
trader_id,
strategy_id,
instrument_id,
client_order_id,
account_id,
Ustr::from(&error.msg),
UUID4::new(),
ts_init,
ts_init,
false,
false,
);
result.push(NautilusWsMessage::OrderRejected(rejected));
}
}
}
fn process_batch_cancel_errors(
&self,
batch_data: Vec<BatchCancelData>,
errors: Vec<BybitBatchOrderError>,
result: &mut Vec<NautilusWsMessage>,
) {
let clock = get_atomic_clock_realtime();
let ts_init = clock.get_time_ns();
for (idx, (client_order_id, (_, trader_id, strategy_id, instrument_id, venue_order_id))) in
batch_data.into_iter().enumerate()
{
if let Some(error) = errors.get(idx)
&& error.code != 0
{
log::warn!(
"Batch cancel rejected: client_order_id={client_order_id}, error_code={}, error_msg={}",
error.code,
error.msg
);
let rejected = OrderCancelRejected::new(
trader_id,
strategy_id,
instrument_id,
client_order_id,
Ustr::from(&error.msg),
UUID4::new(),
ts_init,
ts_init,
false,
venue_order_id,
self.account_id,
);
result.push(NautilusWsMessage::OrderCancelRejected(rejected));
}
}
}
pub(super) async fn next(&mut self) -> Option<NautilusWsMessage> {
let clock = get_atomic_clock_realtime();
loop {
if let Some(msg) = self.message_queue.pop_front() {
return Some(msg);
}
tokio::select! {
Some(cmd) = self.cmd_rx.recv() => {
match cmd {
HandlerCommand::SetClient(client) => {
log::debug!("WebSocketClient received by handler");
self.client = Some(client);
}
HandlerCommand::Disconnect => {
log::debug!("Disconnect command received");
if let Some(client) = self.client.take() {
client.disconnect().await;
}
}
HandlerCommand::Authenticate { payload } => {
log::debug!("Authenticate command received");
if let Err(e) = self.send_with_retry(payload).await {
log::error!("Failed to send authentication after retries: {e}");
}
}
HandlerCommand::Subscribe { topics } => {
for topic in topics {
log::debug!("Subscribing to topic: topic={topic}");
if let Err(e) = self.send_with_retry(topic.clone()).await {
log::error!("Failed to send subscription after retries: topic={topic}, error={e}");
}
}
}
HandlerCommand::Unsubscribe { topics } => {
for topic in topics {
log::debug!("Unsubscribing from topic: topic={topic}");
if let Err(e) = self.send_with_retry(topic.clone()).await {
log::error!("Failed to send unsubscription after retries: topic={topic}, error={e}");
}
}
}
HandlerCommand::SendText { payload } => {
if let Err(e) = self.send_with_retry(payload).await {
log::error!("Error sending text with retry: {e}");
}
}
HandlerCommand::InitializeInstruments(instruments) => {
for inst in instruments {
self.instruments_cache.insert(inst.symbol().inner(), inst);
}
}
HandlerCommand::UpdateInstrument(inst) => {
self.instruments_cache.insert(inst.symbol().inner(), inst);
}
HandlerCommand::RegisterBatchPlace { req_id, orders } => {
log::debug!(
"Registering batch place request: req_id={req_id}, num_orders={}",
orders.len()
);
self.pending_batch_place_requests.insert(req_id, orders);
}
HandlerCommand::RegisterBatchCancel { req_id, cancels } => {
log::debug!(
"Registering batch cancel request: req_id={req_id}, num_cancels={}",
cancels.len()
);
self.pending_batch_cancel_requests.insert(req_id, cancels);
}
HandlerCommand::PlaceOrder {
params,
client_order_id,
trader_id,
strategy_id,
instrument_id,
} => {
let request_id = self.generate_unique_request_id();
self.pending_place_requests.insert(
request_id.clone(),
(client_order_id, trader_id, strategy_id, instrument_id),
);
let referer = if self.include_referer_header(params.time_in_force) {
Some(BYBIT_NAUTILUS_BROKER_ID.to_string())
} else {
None
};
let request = BybitWsRequest {
req_id: Some(request_id.clone()),
op: BybitWsOrderRequestOp::Create,
header: BybitWsHeader::with_referer(referer),
args: vec![params],
};
if let Ok(payload) = serde_json::to_string(&request)
&& let Err(e) = self.send_with_retry(payload).await
{
log::error!("Failed to send place order after retries: {e}");
self.pending_place_requests.remove(&request_id);
}
}
HandlerCommand::AmendOrder {
params,
client_order_id,
trader_id,
strategy_id,
instrument_id,
venue_order_id,
} => {
let request_id = self.generate_unique_request_id();
self.pending_amend_requests.insert(
request_id.clone(),
(client_order_id, trader_id, strategy_id, instrument_id, venue_order_id),
);
let request = BybitWsRequest {
req_id: Some(request_id.clone()),
op: BybitWsOrderRequestOp::Amend,
header: BybitWsHeader::now(),
args: vec![params],
};
if let Ok(payload) = serde_json::to_string(&request)
&& let Err(e) = self.send_with_retry(payload).await
{
log::error!("Failed to send amend order after retries: {e}");
self.pending_amend_requests.remove(&request_id);
}
}
HandlerCommand::CancelOrder {
params,
client_order_id,
trader_id,
strategy_id,
instrument_id,
venue_order_id,
} => {
let request_id = self.generate_unique_request_id();
self.pending_cancel_requests.insert(
request_id.clone(),
(client_order_id, trader_id, strategy_id, instrument_id, venue_order_id),
);
let request = BybitWsRequest {
req_id: Some(request_id.clone()),
op: BybitWsOrderRequestOp::Cancel,
header: BybitWsHeader::now(),
args: vec![params],
};
if let Ok(payload) = serde_json::to_string(&request)
&& let Err(e) = self.send_with_retry(payload).await
{
log::error!("Failed to send cancel order after retries: {e}");
self.pending_cancel_requests.remove(&request_id);
}
}
}
continue;
}
() = tokio::time::sleep(std::time::Duration::from_millis(100)) => {
if self.signal.load(Ordering::Relaxed) {
log::debug!("Stop signal received during idle period");
return None;
}
continue;
}
msg = self.raw_rx.recv() => {
let msg = match msg {
Some(msg) => msg,
None => {
log::debug!("WebSocket stream closed");
return None;
}
};
if let Message::Ping(data) = &msg {
log::trace!("Received ping frame with {} bytes", data.len());
if let Some(client) = &self.client
&& let Err(e) = client.send_pong(data.to_vec()).await
{
log::warn!("Failed to send pong frame: error={e}");
}
continue;
}
let event = match Self::parse_raw_message(msg) {
Some(event) => event,
None => continue,
};
if self.signal.load(Ordering::Relaxed) {
log::debug!("Stop signal received");
return None;
}
let ts_init = clock.get_time_ns();
let instruments = self.instruments_cache.clone();
let funding_cache = Arc::clone(&self.funding_cache);
let nautilus_messages = self.parse_to_nautilus_messages(
event,
&instruments,
self.account_id,
self.product_type,
&funding_cache,
ts_init,
)
.await;
self.message_queue.extend(nautilus_messages);
}
}
}
}
fn parse_raw_message(msg: Message) -> Option<BybitWsMessage> {
use serde_json::Value;
match msg {
Message::Text(text) => {
if text == nautilus_network::RECONNECTED {
log::info!("Received WebSocket reconnected signal");
return Some(BybitWsMessage::Reconnected);
}
if text.trim().eq_ignore_ascii_case("pong") {
return None;
}
log::trace!("Raw websocket message: {text}");
let value: Value = match serde_json::from_str(&text) {
Ok(v) => v,
Err(e) => {
log::error!("Failed to parse WebSocket message: {e}: {text}");
return None;
}
};
Some(classify_bybit_message(value))
}
Message::Binary(msg) => {
log::debug!("Raw binary: {msg:?}");
None
}
Message::Close(_) => {
log::debug!("Received close message, waiting for reconnection");
None
}
_ => None,
}
}
#[allow(clippy::too_many_arguments)]
async fn parse_to_nautilus_messages(
&mut self,
msg: BybitWsMessage,
instruments: &AHashMap<Ustr, InstrumentAny>,
account_id: Option<AccountId>,
product_type: Option<BybitProductType>,
funding_cache: &FundingCache,
ts_init: UnixNanos,
) -> Vec<NautilusWsMessage> {
let mut result = Vec::new();
match msg {
BybitWsMessage::Orderbook(msg) => {
let raw_symbol = msg.data.s;
let symbol =
product_type.map_or(raw_symbol, |pt| make_bybit_symbol(raw_symbol, pt));
if let Some(instrument) = instruments.get(&symbol) {
match parse_orderbook_deltas(&msg, instrument, ts_init) {
Ok(deltas) => result.push(NautilusWsMessage::Deltas(deltas)),
Err(e) => log::error!("Error parsing orderbook deltas: {e}"),
}
if let Some(depth_str) = msg.topic.as_str().split('.').nth(1)
&& depth_str == "1"
{
let instrument_id = instrument.id();
let last_quote = self.quote_cache.get(&instrument_id);
match parse_orderbook_quote(&msg, instrument, last_quote, ts_init) {
Ok(quote) => {
self.quote_cache.insert(instrument_id, quote);
result.push(NautilusWsMessage::Data(vec![Data::Quote(quote)]));
}
Err(e) => log::debug!("Skipping orderbook quote: {e}"),
}
}
} else {
log::debug!(
"No instrument found for symbol in Orderbook message: raw_symbol={raw_symbol}, full_symbol={symbol}"
);
}
}
BybitWsMessage::Trade(msg) => {
let mut data_vec = Vec::new();
for trade in &msg.data {
let raw_symbol = trade.s;
let symbol =
product_type.map_or(raw_symbol, |pt| make_bybit_symbol(raw_symbol, pt));
if let Some(instrument) = instruments.get(&symbol) {
match parse_ws_trade_tick(trade, instrument, ts_init) {
Ok(tick) => data_vec.push(Data::Trade(tick)),
Err(e) => log::error!("Error parsing trade tick: {e}"),
}
} else {
log::debug!(
"No instrument found for symbol in Trade message: raw_symbol={raw_symbol}, full_symbol={symbol}"
);
}
}
if !data_vec.is_empty() {
result.push(NautilusWsMessage::Data(data_vec));
}
}
BybitWsMessage::Kline(msg) => {
let bar_type = match self.bar_types_cache.get(msg.topic.as_str()) {
Some(bt) => *bt,
None => {
log::debug!("No bar type subscription found for topic: {}", msg.topic);
return result;
}
};
let symbol = Ustr::from(bar_type.instrument_id().symbol.as_str());
let instrument = match instruments.get(&symbol) {
Some(inst) => inst,
None => {
log::debug!(
"No instrument found for bar type: {}",
bar_type.instrument_id()
);
return result;
}
};
let mut data_vec = Vec::new();
for kline in &msg.data {
if !kline.confirm {
continue;
}
match parse_ws_kline_bar(
kline,
instrument,
bar_type,
self.bars_timestamp_on_close,
ts_init,
) {
Ok(bar) => data_vec.push(Data::Bar(bar)),
Err(e) => log::error!("Error parsing kline to bar: {e}"),
}
}
if !data_vec.is_empty() {
result.push(NautilusWsMessage::Data(data_vec));
}
}
BybitWsMessage::TickerLinear(msg) => {
let raw_symbol = msg.data.symbol;
let symbol =
product_type.map_or(raw_symbol, |pt| make_bybit_symbol(raw_symbol, pt));
if let Some(instrument) = instruments.get(&symbol) {
let instrument_id = instrument.id();
let ts_event = parse_millis_i64(msg.ts, "ticker.ts").unwrap_or(ts_init);
let price_precision = instrument.price_precision();
let size_precision = instrument.size_precision();
let bid_price = msg
.data
.bid1_price
.as_deref()
.map(|s| parse_price_with_precision(s, price_precision, "bid1Price"))
.transpose();
let ask_price = msg
.data
.ask1_price
.as_deref()
.map(|s| parse_price_with_precision(s, price_precision, "ask1Price"))
.transpose();
let bid_size = msg
.data
.bid1_size
.as_deref()
.map(|s| parse_quantity_with_precision(s, size_precision, "bid1Size"))
.transpose();
let ask_size = msg
.data
.ask1_size
.as_deref()
.map(|s| parse_quantity_with_precision(s, size_precision, "ask1Size"))
.transpose();
match (bid_price, ask_price, bid_size, ask_size) {
(Ok(bp), Ok(ap), Ok(bs), Ok(as_)) => {
match self.quote_cache.process(
instrument_id,
bp,
ap,
bs,
as_,
ts_event,
ts_init,
) {
Ok(quote) => {
result.push(NautilusWsMessage::Data(vec![Data::Quote(quote)]));
}
Err(e) => {
let raw_data = serde_json::to_string(&msg.data)
.unwrap_or_else(|_| "<failed to serialize>".to_string());
log::debug!(
"Skipping partial ticker update: {e}, raw_data: {raw_data}"
);
}
}
}
_ => {
let raw_data = serde_json::to_string(&msg.data)
.unwrap_or_else(|_| "<failed to serialize>".to_string());
log::warn!(
"Failed to parse ticker fields, skipping update, raw_data: {raw_data}"
);
}
}
if msg.data.funding_rate.is_some() && msg.data.next_funding_time.is_some() {
let should_publish = {
let cache = funding_cache.read().await;
match cache.get(&symbol) {
Some((cached_rate, cached_time)) => {
cached_rate.as_ref() != msg.data.funding_rate.as_ref()
|| cached_time.as_ref()
!= msg.data.next_funding_time.as_ref()
}
None => true,
}
};
if should_publish {
match parse_ticker_linear_funding(
&msg.data,
instrument_id,
ts_event,
ts_init,
) {
Ok(funding) => {
let cache_key = (
msg.data.funding_rate.clone(),
msg.data.next_funding_time.clone(),
);
funding_cache.write().await.insert(symbol, cache_key);
result.push(NautilusWsMessage::FundingRates(vec![funding]));
}
Err(e) => {
log::debug!("Skipping funding rate update: {e}");
}
}
}
}
if msg.data.mark_price.is_some() {
match parse_ticker_linear_mark_price(
&msg.data, instrument, ts_event, ts_init,
) {
Ok(mark_price) => {
result.push(NautilusWsMessage::MarkPrices(vec![mark_price]));
}
Err(e) => {
log::debug!("Skipping mark price update: {e}");
}
}
}
if msg.data.index_price.is_some() {
match parse_ticker_linear_index_price(
&msg.data, instrument, ts_event, ts_init,
) {
Ok(index_price) => {
result.push(NautilusWsMessage::IndexPrices(vec![index_price]));
}
Err(e) => {
log::debug!("Skipping index price update: {e}");
}
}
}
} else {
log::debug!(
"No instrument found for symbol in TickerLinear message: raw_symbol={raw_symbol}, full_symbol={symbol}"
);
}
}
BybitWsMessage::TickerOption(msg) => {
let raw_symbol = &msg.data.symbol;
let symbol = product_type.map_or_else(
|| raw_symbol.as_str().into(),
|pt| make_bybit_symbol(raw_symbol, pt),
);
if let Some(instrument) = instruments.get(&symbol) {
let instrument_id = instrument.id();
let ts_event = parse_millis_i64(msg.ts, "ticker.ts").unwrap_or(ts_init);
let price_precision = instrument.price_precision();
let size_precision = instrument.size_precision();
let bid_price = parse_price_with_precision(
&msg.data.bid_price,
price_precision,
"bidPrice",
);
let ask_price = parse_price_with_precision(
&msg.data.ask_price,
price_precision,
"askPrice",
);
let bid_size = parse_quantity_with_precision(
&msg.data.bid_size,
size_precision,
"bidSize",
);
let ask_size = parse_quantity_with_precision(
&msg.data.ask_size,
size_precision,
"askSize",
);
match (bid_price, ask_price, bid_size, ask_size) {
(Ok(bp), Ok(ap), Ok(bs), Ok(as_)) => {
match self.quote_cache.process(
instrument_id,
Some(bp),
Some(ap),
Some(bs),
Some(as_),
ts_event,
ts_init,
) {
Ok(quote) => {
result.push(NautilusWsMessage::Data(vec![Data::Quote(quote)]));
}
Err(e) => {
let raw_data = serde_json::to_string(&msg.data)
.unwrap_or_else(|_| "<failed to serialize>".to_string());
log::debug!(
"Skipping partial ticker update: {e}, raw_data: {raw_data}"
);
}
}
}
_ => {
let raw_data = serde_json::to_string(&msg.data)
.unwrap_or_else(|_| "<failed to serialize>".to_string());
log::warn!(
"Failed to parse ticker fields, skipping update, raw_data: {raw_data}"
);
}
}
match parse_ticker_option_mark_price(&msg, instrument, ts_init) {
Ok(mark_price) => {
result.push(NautilusWsMessage::MarkPrices(vec![mark_price]));
}
Err(e) => {
log::debug!("Skipping option mark price update: {e}");
}
}
match parse_ticker_option_index_price(&msg, instrument, ts_init) {
Ok(index_price) => {
result.push(NautilusWsMessage::IndexPrices(vec![index_price]));
}
Err(e) => {
log::debug!("Skipping option index price update: {e}");
}
}
} else {
log::debug!(
"No instrument found for symbol in TickerOption message: raw_symbol={raw_symbol}, full_symbol={symbol}"
);
}
}
BybitWsMessage::AccountOrder(msg) => {
if let Some(account_id) = account_id {
let mut reports = Vec::new();
for order in &msg.data {
let raw_symbol = order.symbol;
let symbol = make_bybit_symbol(raw_symbol, order.category);
if let Some(instrument) = instruments.get(&symbol) {
match parse_ws_order_status_report(
order, instrument, account_id, ts_init,
) {
Ok(report) => reports.push(report),
Err(e) => log::error!("Error parsing order status report: {e}"),
}
} else {
log::debug!(
"No instrument found for symbol in AccountOrder message: raw_symbol={raw_symbol}, full_symbol={symbol}"
);
}
}
if !reports.is_empty() {
result.push(NautilusWsMessage::OrderStatusReports(reports));
}
}
}
BybitWsMessage::AccountExecution(msg) => {
if let Some(account_id) = account_id {
let mut reports = Vec::new();
for execution in &msg.data {
let raw_symbol = execution.symbol;
let symbol = make_bybit_symbol(raw_symbol, execution.category);
if let Some(instrument) = instruments.get(&symbol) {
match parse_ws_fill_report(execution, account_id, instrument, ts_init) {
Ok(report) => reports.push(report),
Err(e) => log::error!("Error parsing fill report: {e}"),
}
} else {
log::debug!(
"No instrument found for symbol in AccountExecution message: raw_symbol={raw_symbol}, full_symbol={symbol}"
);
}
}
if !reports.is_empty() {
result.push(NautilusWsMessage::FillReports(reports));
}
}
}
BybitWsMessage::AccountPosition(msg) => {
if let Some(account_id) = account_id {
for position in &msg.data {
let raw_symbol = position.symbol;
let symbol = make_bybit_symbol(raw_symbol, position.category);
if let Some(instrument) = instruments.get(&symbol) {
match parse_ws_position_status_report(
position, account_id, instrument, ts_init,
) {
Ok(report) => {
result.push(NautilusWsMessage::PositionStatusReport(report));
}
Err(e) => {
log::error!("Error parsing position status report: {e}");
}
}
} else {
log::debug!(
"No instrument found for symbol in AccountPosition message: raw_symbol={raw_symbol}, full_symbol={symbol}"
);
}
}
}
}
BybitWsMessage::AccountWallet(msg) => {
if let Some(account_id) = account_id {
for wallet in &msg.data {
let ts_event = UnixNanos::from(msg.creation_time as u64 * 1_000_000);
match parse_ws_account_state(wallet, account_id, ts_event, ts_init) {
Ok(state) => result.push(NautilusWsMessage::AccountState(state)),
Err(e) => log::error!("Error parsing account state: {e}"),
}
}
}
}
BybitWsMessage::OrderResponse(resp) => {
if resp.ret_code == 0 {
log::debug!(
"Order operation successful: op={}, ret_msg={}",
resp.op,
resp.ret_msg
);
if resp.op.contains("batch") {
self.handle_batch_response(&resp, &mut result);
} else if let Some(req_id) = &resp.req_id {
if resp.op.contains("create") {
self.pending_place_requests.remove(req_id);
} else if resp.op.contains("cancel") {
self.pending_cancel_requests.remove(req_id);
} else if resp.op.contains("amend") {
self.pending_amend_requests.remove(req_id);
}
} else if let Some(order_link_id) =
resp.data.get("orderLinkId").and_then(|v| v.as_str())
{
let client_order_id = ClientOrderId::from(order_link_id);
if resp.op.contains("create") {
self.find_and_remove_place_request_by_client_order_id(&client_order_id);
} else if resp.op.contains("cancel") {
self.find_and_remove_cancel_request_by_client_order_id(
&client_order_id,
);
} else if resp.op.contains("amend") {
self.find_and_remove_amend_request_by_client_order_id(&client_order_id);
}
}
} else if let Some(req_id) = &resp.req_id {
let clock = get_atomic_clock_realtime();
let ts_init = clock.get_time_ns();
if resp.op.contains("batch") {
self.handle_batch_failure(
req_id,
&resp.ret_msg,
&resp.op,
ts_init,
&mut result,
);
} else if resp.op.contains("create")
&& let Some((_, (client_order_id, trader_id, strategy_id, instrument_id))) =
self.pending_place_requests.remove(req_id)
{
let Some(account_id) = self.account_id else {
log::error!(
"Cannot create OrderRejected event: account_id is None: request_id={req_id}, reason={}",
resp.ret_msg
);
return result;
};
let rejected = OrderRejected::new(
trader_id,
strategy_id,
instrument_id,
client_order_id,
account_id,
Ustr::from(&resp.ret_msg),
UUID4::new(),
ts_init,
ts_init,
false,
false,
);
result.push(NautilusWsMessage::OrderRejected(rejected));
} else if resp.op.contains("cancel")
&& let Some((
_,
(
client_order_id,
trader_id,
strategy_id,
instrument_id,
venue_order_id,
),
)) = self.pending_cancel_requests.remove(req_id)
{
let rejected = OrderCancelRejected::new(
trader_id,
strategy_id,
instrument_id,
client_order_id,
Ustr::from(&resp.ret_msg),
UUID4::new(),
ts_init,
ts_init,
false,
venue_order_id,
self.account_id,
);
result.push(NautilusWsMessage::OrderCancelRejected(rejected));
} else if resp.op.contains("amend")
&& let Some((
_,
(
client_order_id,
trader_id,
strategy_id,
instrument_id,
venue_order_id,
),
)) = self.pending_amend_requests.remove(req_id)
{
let rejected = OrderModifyRejected::new(
trader_id,
strategy_id,
instrument_id,
client_order_id,
Ustr::from(&resp.ret_msg),
UUID4::new(),
ts_init,
ts_init,
false,
venue_order_id,
self.account_id,
);
result.push(NautilusWsMessage::OrderModifyRejected(rejected));
}
} else if let Some(order_link_id) =
resp.data.get("orderLinkId").and_then(|v| v.as_str())
{
let clock = get_atomic_clock_realtime();
let ts_init = clock.get_time_ns();
let client_order_id = ClientOrderId::from(order_link_id);
if resp.op.contains("create") {
if let Some((_, (_, trader_id, strategy_id, instrument_id))) =
self.find_and_remove_place_request_by_client_order_id(&client_order_id)
{
let Some(account_id) = self.account_id else {
log::error!(
"Cannot create OrderRejected event: account_id is None: client_order_id={client_order_id}, reason={}",
resp.ret_msg
);
return result;
};
let rejected = OrderRejected::new(
trader_id,
strategy_id,
instrument_id,
client_order_id,
account_id,
Ustr::from(&resp.ret_msg),
UUID4::new(),
ts_init,
ts_init,
false,
false,
);
result.push(NautilusWsMessage::OrderRejected(rejected));
}
} else if resp.op.contains("cancel") {
if let Some((
_,
(_, trader_id, strategy_id, instrument_id, venue_order_id),
)) =
self.find_and_remove_cancel_request_by_client_order_id(&client_order_id)
{
let rejected = OrderCancelRejected::new(
trader_id,
strategy_id,
instrument_id,
client_order_id,
Ustr::from(&resp.ret_msg),
UUID4::new(),
ts_init,
ts_init,
false,
venue_order_id,
self.account_id,
);
result.push(NautilusWsMessage::OrderCancelRejected(rejected));
}
} else if resp.op.contains("amend")
&& let Some((_, (_, trader_id, strategy_id, instrument_id, venue_order_id))) =
self.find_and_remove_amend_request_by_client_order_id(&client_order_id)
{
let rejected = OrderModifyRejected::new(
trader_id,
strategy_id,
instrument_id,
client_order_id,
Ustr::from(&resp.ret_msg),
UUID4::new(),
ts_init,
ts_init,
false,
venue_order_id,
self.account_id,
);
result.push(NautilusWsMessage::OrderModifyRejected(rejected));
}
} else {
log::warn!(
"Order operation failed but request_id could not be extracted from response: op={}, ret_code={}, ret_msg={}",
resp.op,
resp.ret_code,
resp.ret_msg
);
}
}
BybitWsMessage::Auth(auth_response) => {
let is_success =
auth_response.success.unwrap_or(false) || (auth_response.ret_code == Some(0));
if is_success {
self.auth_tracker.succeed();
log::info!("WebSocket authenticated");
result.push(NautilusWsMessage::Authenticated);
} else {
let error_msg = auth_response
.ret_msg
.as_deref()
.unwrap_or("Authentication rejected");
self.auth_tracker.fail(error_msg);
log::error!("WebSocket authentication failed: error={error_msg}");
result.push(NautilusWsMessage::Error(BybitWebSocketError::from_message(
error_msg.to_string(),
)));
}
}
BybitWsMessage::Error(err) => {
result.push(NautilusWsMessage::Error(err));
}
BybitWsMessage::Reconnected => {
self.quote_cache.clear();
result.push(NautilusWsMessage::Reconnected);
}
BybitWsMessage::Subscription(sub_msg) => {
let pending_topics = self.subscriptions.pending_subscribe_topics();
match sub_msg.op {
BybitWsOperation::Subscribe => {
if sub_msg.success {
for topic in pending_topics {
self.subscriptions.confirm_subscribe(&topic);
log::debug!("Subscription confirmed: topic={topic}");
}
} else {
for topic in pending_topics {
self.subscriptions.mark_failure(&topic);
log::warn!(
"Subscription failed, will retry on reconnect: topic={topic}, error={:?}",
sub_msg.ret_msg
);
}
}
}
BybitWsOperation::Unsubscribe => {
let pending_unsub = self.subscriptions.pending_unsubscribe_topics();
if sub_msg.success {
for topic in pending_unsub {
self.subscriptions.confirm_unsubscribe(&topic);
log::debug!("Unsubscription confirmed: topic={topic}");
}
} else {
for topic in pending_unsub {
log::warn!(
"Unsubscription failed: topic={topic}, error={:?}",
sub_msg.ret_msg
);
}
}
}
_ => {}
}
}
_ => {}
}
result
}
}
pub fn classify_bybit_message(value: serde_json::Value) -> BybitWsMessage {
if let Some(op_val) = value.get("op") {
if let Ok(op) = serde_json::from_value::<BybitWsOperation>(op_val.clone())
&& op == BybitWsOperation::Auth
&& let Ok(auth) = serde_json::from_value::<BybitWsAuthResponse>(value.clone())
{
let is_success = auth.success.unwrap_or(false) || auth.ret_code.unwrap_or(-1) == 0;
if is_success {
return BybitWsMessage::Auth(auth);
}
let resp = BybitWsResponse {
op: Some(auth.op.clone()),
topic: None,
success: auth.success,
conn_id: auth.conn_id.clone(),
req_id: None,
ret_code: auth.ret_code,
ret_msg: auth.ret_msg,
};
let error = BybitWebSocketError::from_response(&resp);
return BybitWsMessage::Error(error);
}
if let Some(op_str) = op_val.as_str()
&& op_str.starts_with("order.")
{
return serde_json::from_value::<BybitWsOrderResponse>(value.clone()).map_or_else(
|_| BybitWsMessage::Raw(value),
BybitWsMessage::OrderResponse,
);
}
}
if let Some(success) = value.get("success").and_then(serde_json::Value::as_bool) {
if success {
return serde_json::from_value::<BybitWsSubscriptionMsg>(value.clone())
.map_or_else(|_| BybitWsMessage::Raw(value), BybitWsMessage::Subscription);
}
return serde_json::from_value::<BybitWsResponse>(value.clone()).map_or_else(
|_| BybitWsMessage::Raw(value),
|resp| {
let error = BybitWebSocketError::from_response(&resp);
BybitWsMessage::Error(error)
},
);
}
if let Some(topic) = value.get("topic").and_then(serde_json::Value::as_str) {
if topic.starts_with(BYBIT_TOPIC_ORDERBOOK) {
return serde_json::from_value(value.clone())
.map_or_else(|_| BybitWsMessage::Raw(value), BybitWsMessage::Orderbook);
}
if topic.contains(BYBIT_TOPIC_PUBLIC_TRADE) || topic.starts_with(BYBIT_TOPIC_TRADE) {
return serde_json::from_value(value.clone())
.map_or_else(|_| BybitWsMessage::Raw(value), BybitWsMessage::Trade);
}
if topic.starts_with(BYBIT_TOPIC_KLINE) {
return serde_json::from_value(value.clone())
.map_or_else(|_| BybitWsMessage::Raw(value), BybitWsMessage::Kline);
}
if topic.starts_with(BYBIT_TOPIC_TICKERS) {
let is_option = value
.get("data")
.and_then(|d| d.get("symbol"))
.and_then(|s| s.as_str())
.is_some_and(|symbol| symbol.contains('-') && symbol.matches('-').count() >= 3);
if is_option {
return serde_json::from_value(value.clone())
.map_or_else(|_| BybitWsMessage::Raw(value), BybitWsMessage::TickerOption);
}
return serde_json::from_value(value.clone())
.map_or_else(|_| BybitWsMessage::Raw(value), BybitWsMessage::TickerLinear);
}
if topic.starts_with(BYBIT_TOPIC_ORDER) {
return serde_json::from_value(value.clone())
.map_or_else(|_| BybitWsMessage::Raw(value), BybitWsMessage::AccountOrder);
}
if topic.starts_with(BYBIT_TOPIC_EXECUTION) {
return serde_json::from_value(value.clone()).map_or_else(
|_| BybitWsMessage::Raw(value),
BybitWsMessage::AccountExecution,
);
}
if topic.starts_with(BYBIT_TOPIC_WALLET) {
return serde_json::from_value(value.clone()).map_or_else(
|_| BybitWsMessage::Raw(value),
BybitWsMessage::AccountWallet,
);
}
if topic.starts_with(BYBIT_TOPIC_POSITION) {
return serde_json::from_value(value.clone()).map_or_else(
|_| BybitWsMessage::Raw(value),
BybitWsMessage::AccountPosition,
);
}
}
BybitWsMessage::Raw(value)
}
#[cfg(test)]
mod tests {
use rstest::rstest;
use super::*;
use crate::common::consts::BYBIT_WS_TOPIC_DELIMITER;
fn create_test_handler() -> FeedHandler {
let signal = Arc::new(AtomicBool::new(false));
let (_cmd_tx, cmd_rx) = tokio::sync::mpsc::unbounded_channel();
let (_raw_tx, raw_rx) = tokio::sync::mpsc::unbounded_channel();
let (out_tx, _out_rx) = tokio::sync::mpsc::unbounded_channel();
let auth_tracker = AuthTracker::new();
let subscriptions = SubscriptionState::new(BYBIT_WS_TOPIC_DELIMITER);
let funding_cache = Arc::new(tokio::sync::RwLock::new(AHashMap::new()));
let bar_types_cache = Arc::new(DashMap::new());
FeedHandler::new(
signal,
cmd_rx,
raw_rx,
out_tx,
None,
None,
true,
Arc::new(AtomicU8::new(0)),
auth_tracker,
subscriptions,
funding_cache,
bar_types_cache,
)
}
#[rstest]
fn test_generate_unique_request_id_returns_different_ids() {
let handler = create_test_handler();
let id1 = handler.generate_unique_request_id();
let id2 = handler.generate_unique_request_id();
let id3 = handler.generate_unique_request_id();
assert_ne!(id1, id2);
assert_ne!(id2, id3);
assert_ne!(id1, id3);
}
#[rstest]
fn test_generate_unique_request_id_produces_valid_uuids() {
let handler = create_test_handler();
let id1 = handler.generate_unique_request_id();
let id2 = handler.generate_unique_request_id();
assert!(UUID4::from(id1.as_str()).to_string() == id1);
assert!(UUID4::from(id2.as_str()).to_string() == id2);
}
#[rstest]
fn test_multiple_place_orders_use_different_request_ids() {
let handler = create_test_handler();
let req_id_1 = handler.generate_unique_request_id();
let req_id_2 = handler.generate_unique_request_id();
let req_id_3 = handler.generate_unique_request_id();
assert_ne!(req_id_1, req_id_2);
assert_ne!(req_id_2, req_id_3);
assert_ne!(req_id_1, req_id_3);
}
#[rstest]
fn test_multiple_amends_use_different_request_ids() {
let handler = create_test_handler();
let req_id_1 = handler.generate_unique_request_id();
let req_id_2 = handler.generate_unique_request_id();
let req_id_3 = handler.generate_unique_request_id();
assert_ne!(
req_id_1, req_id_2,
"Multiple amends should generate different request IDs to avoid 'Duplicate reqId' errors"
);
assert_ne!(
req_id_2, req_id_3,
"Multiple amends should generate different request IDs to avoid 'Duplicate reqId' errors"
);
}
#[rstest]
fn test_multiple_cancels_use_different_request_ids() {
let handler = create_test_handler();
let req_id_1 = handler.generate_unique_request_id();
let req_id_2 = handler.generate_unique_request_id();
assert_ne!(
req_id_1, req_id_2,
"Multiple cancels should generate different request IDs"
);
}
#[rstest]
fn test_concurrent_request_id_generation() {
let handler = create_test_handler();
let mut ids = std::collections::HashSet::new();
for _ in 0..100 {
let id = handler.generate_unique_request_id();
assert!(
ids.insert(id.clone()),
"Generated duplicate request ID: {id}"
);
}
assert_eq!(ids.len(), 100);
}
}