use std::{num::NonZero, str::FromStr};
use ahash::AHashMap;
use chrono::Timelike;
use nautilus_core::{UnixNanos, uuid::UUID4};
#[cfg(test)]
use nautilus_model::types::Currency;
use nautilus_model::{
data::{
Bar, BarSpecification, BarType, BookOrder, Data, FundingRateUpdate, IndexPriceUpdate,
MarkPriceUpdate, OrderBookDelta, OrderBookDepth10, QuoteTick, TradeTick,
depth::DEPTH10_LEN,
},
enums::{
AccountType, AggregationSource, BarAggregation, OrderSide, OrderStatus, OrderType,
PriceType, RecordFlag, TimeInForce, TrailingOffsetType,
},
events::{
OrderAccepted, OrderCanceled, OrderExpired, OrderRejected, OrderTriggered, OrderUpdated,
account::state::AccountState,
},
identifiers::{
AccountId, ClientOrderId, InstrumentId, OrderListId, StrategyId, Symbol, TradeId, TraderId,
VenueOrderId,
},
instruments::{Instrument, InstrumentAny},
reports::{FillReport, OrderStatusReport, PositionStatusReport},
types::{AccountBalance, MarginBalance, Money, Price, Quantity},
};
use rust_decimal::Decimal;
use ustr::Ustr;
use uuid::Uuid;
use super::{
enums::{BitmexAction, BitmexWsTopic},
messages::{
BitmexExecutionMsg, BitmexFundingMsg, BitmexInstrumentMsg, BitmexMarginMsg,
BitmexOrderBook10Msg, BitmexOrderBookMsg, BitmexOrderMsg, BitmexPositionMsg,
BitmexQuoteMsg, BitmexTradeBinMsg, BitmexTradeMsg, BitmexWalletMsg,
},
};
use crate::{
common::{
consts::BITMEX_VENUE,
enums::{
BitmexExecInstruction, BitmexExecType, BitmexOrderStatus, BitmexOrderType,
BitmexPegPriceType, BitmexSide,
},
parse::{
bitmex_currency_divisor, clean_reason, extract_trigger_type, map_bitmex_currency,
normalize_trade_bin_prices, normalize_trade_bin_volume, parse_account_balance,
parse_contracts_quantity, parse_fractional_quantity, parse_instrument_id,
parse_liquidity_side, parse_optional_datetime_to_unix_nanos, parse_position_side,
parse_signed_contracts_quantity,
},
},
http::parse::get_currency,
websocket::messages::BitmexOrderUpdateMsg,
};
const BAR_SPEC_1_MINUTE: BarSpecification = BarSpecification {
step: NonZero::new(1).expect("1 is a valid non-zero usize"),
aggregation: BarAggregation::Minute,
price_type: PriceType::Last,
};
const BAR_SPEC_5_MINUTE: BarSpecification = BarSpecification {
step: NonZero::new(5).expect("5 is a valid non-zero usize"),
aggregation: BarAggregation::Minute,
price_type: PriceType::Last,
};
const BAR_SPEC_1_HOUR: BarSpecification = BarSpecification {
step: NonZero::new(1).expect("1 is a valid non-zero usize"),
aggregation: BarAggregation::Hour,
price_type: PriceType::Last,
};
const BAR_SPEC_1_DAY: BarSpecification = BarSpecification {
step: NonZero::new(1).expect("1 is a valid non-zero usize"),
aggregation: BarAggregation::Day,
price_type: PriceType::Last,
};
#[inline]
#[must_use]
pub fn is_index_symbol(symbol: &Ustr) -> bool {
symbol.starts_with('.')
}
#[must_use]
pub fn parse_book_msg_vec(
data: Vec<BitmexOrderBookMsg>,
action: BitmexAction,
instruments: &AHashMap<Ustr, InstrumentAny>,
ts_init: UnixNanos,
) -> Vec<Data> {
let mut deltas = Vec::with_capacity(data.len());
for msg in data {
if let Some(instrument) = instruments.get(&msg.symbol) {
let instrument_id = instrument.id();
let price_precision = instrument.price_precision();
deltas.push(Data::Delta(parse_book_msg(
&msg,
&action,
instrument,
instrument_id,
price_precision,
ts_init,
)));
} else {
log::error!(
"Instrument cache miss: book delta dropped for symbol={}",
msg.symbol
);
}
}
if let Some(Data::Delta(last_delta)) = deltas.last_mut() {
*last_delta = OrderBookDelta::new(
last_delta.instrument_id,
last_delta.action,
last_delta.order,
last_delta.flags | RecordFlag::F_LAST as u8,
last_delta.sequence,
last_delta.ts_event,
last_delta.ts_init,
);
}
deltas
}
#[must_use]
pub fn parse_book10_msg_vec(
data: Vec<BitmexOrderBook10Msg>,
instruments: &AHashMap<Ustr, InstrumentAny>,
ts_init: UnixNanos,
) -> Vec<Data> {
let mut depths = Vec::with_capacity(data.len());
for msg in data {
if let Some(instrument) = instruments.get(&msg.symbol) {
let instrument_id = instrument.id();
let price_precision = instrument.price_precision();
match parse_book10_msg(&msg, instrument, instrument_id, price_precision, ts_init) {
Ok(depth) => depths.push(Data::Depth10(Box::new(depth))),
Err(e) => {
log::error!("Failed to parse orderBook10 for symbol={}: {e}", msg.symbol);
}
}
} else {
log::error!(
"Instrument cache miss: depth10 message dropped for symbol={}",
msg.symbol
);
}
}
depths
}
#[must_use]
pub fn parse_trade_msg_vec(
data: Vec<BitmexTradeMsg>,
instruments: &AHashMap<Ustr, InstrumentAny>,
ts_init: UnixNanos,
) -> Vec<Data> {
let mut trades = Vec::with_capacity(data.len());
for msg in data {
if let Some(instrument) = instruments.get(&msg.symbol) {
let instrument_id = instrument.id();
let price_precision = instrument.price_precision();
trades.push(Data::Trade(parse_trade_msg(
&msg,
instrument,
instrument_id,
price_precision,
ts_init,
)));
} else {
log::error!(
"Instrument cache miss: trade message dropped for symbol={}",
msg.symbol
);
}
}
trades
}
#[must_use]
pub fn parse_trade_bin_msg_vec(
data: Vec<BitmexTradeBinMsg>,
topic: &BitmexWsTopic,
instruments: &AHashMap<Ustr, InstrumentAny>,
ts_init: UnixNanos,
) -> Vec<Data> {
let mut trades = Vec::with_capacity(data.len());
for msg in data {
if let Some(instrument) = instruments.get(&msg.symbol) {
let instrument_id = instrument.id();
let price_precision = instrument.price_precision();
trades.push(Data::Bar(parse_trade_bin_msg(
&msg,
topic,
instrument,
instrument_id,
price_precision,
ts_init,
)));
} else {
log::error!(
"Instrument cache miss: trade bin (bar) dropped for symbol={}",
msg.symbol
);
}
}
trades
}
#[allow(clippy::too_many_arguments)]
#[must_use]
pub fn parse_book_msg(
msg: &BitmexOrderBookMsg,
action: &BitmexAction,
instrument: &InstrumentAny,
instrument_id: InstrumentId,
price_precision: u8,
ts_init: UnixNanos,
) -> OrderBookDelta {
let flags = if action == &BitmexAction::Partial {
RecordFlag::F_SNAPSHOT as u8
} else {
0
};
let action = action.as_book_action();
let price = Price::new(msg.price, price_precision);
let side = msg.side.as_order_side();
let size = parse_contracts_quantity(msg.size.unwrap_or(0), instrument);
let order_id = msg.id;
let order = BookOrder::new(side, price, size, order_id);
let sequence = 0; let ts_event = UnixNanos::from(msg.timestamp);
OrderBookDelta::new(
instrument_id,
action,
order,
flags,
sequence,
ts_event,
ts_init,
)
}
#[allow(clippy::too_many_arguments)]
pub fn parse_book10_msg(
msg: &BitmexOrderBook10Msg,
instrument: &InstrumentAny,
instrument_id: InstrumentId,
price_precision: u8,
ts_init: UnixNanos,
) -> anyhow::Result<OrderBookDepth10> {
let mut bids = Vec::with_capacity(DEPTH10_LEN);
let mut asks = Vec::with_capacity(DEPTH10_LEN);
let mut bid_counts: [u32; DEPTH10_LEN] = [0; DEPTH10_LEN];
let mut ask_counts: [u32; DEPTH10_LEN] = [0; DEPTH10_LEN];
for (i, level) in msg.bids.iter().enumerate() {
let bid_order = BookOrder::new(
OrderSide::Buy,
Price::new(level[0], price_precision),
parse_fractional_quantity(level[1], instrument),
0,
);
bids.push(bid_order);
bid_counts[i] = 1;
}
for (i, level) in msg.asks.iter().enumerate() {
let ask_order = BookOrder::new(
OrderSide::Sell,
Price::new(level[0], price_precision),
parse_fractional_quantity(level[1], instrument),
0,
);
asks.push(ask_order);
ask_counts[i] = 1;
}
let bids: [BookOrder; DEPTH10_LEN] = bids.try_into().map_err(|v: Vec<BookOrder>| {
anyhow::anyhow!(
"Bids length mismatch: expected {DEPTH10_LEN}, was {}",
v.len()
)
})?;
let asks: [BookOrder; DEPTH10_LEN] = asks.try_into().map_err(|v: Vec<BookOrder>| {
anyhow::anyhow!(
"Asks length mismatch: expected {DEPTH10_LEN}, was {}",
v.len()
)
})?;
let ts_event = UnixNanos::from(msg.timestamp);
Ok(OrderBookDepth10::new(
instrument_id,
bids,
asks,
bid_counts,
ask_counts,
RecordFlag::F_SNAPSHOT as u8,
0, ts_event,
ts_init,
))
}
#[must_use]
pub fn parse_quote_msg(
msg: &BitmexQuoteMsg,
last_quote: &QuoteTick,
instrument: &InstrumentAny,
instrument_id: InstrumentId,
price_precision: u8,
ts_init: UnixNanos,
) -> QuoteTick {
let bid_price = match msg.bid_price {
Some(price) => Price::new(price, price_precision),
None => last_quote.bid_price,
};
let ask_price = match msg.ask_price {
Some(price) => Price::new(price, price_precision),
None => last_quote.ask_price,
};
let bid_size = match msg.bid_size {
Some(size) => parse_contracts_quantity(size, instrument),
None => last_quote.bid_size,
};
let ask_size = match msg.ask_size {
Some(size) => parse_contracts_quantity(size, instrument),
None => last_quote.ask_size,
};
let ts_event = UnixNanos::from(msg.timestamp);
QuoteTick::new(
instrument_id,
bid_price,
ask_price,
bid_size,
ask_size,
ts_event,
ts_init,
)
}
#[must_use]
pub fn parse_trade_msg(
msg: &BitmexTradeMsg,
instrument: &InstrumentAny,
instrument_id: InstrumentId,
price_precision: u8,
ts_init: UnixNanos,
) -> TradeTick {
let price = Price::new(msg.price, price_precision);
let size = parse_contracts_quantity(msg.size, instrument);
let aggressor_side = msg.side.as_aggressor_side();
let trade_id = TradeId::new(
msg.trd_match_id
.map_or_else(|| Uuid::new_v4().to_string(), |uuid| uuid.to_string()),
);
let ts_event = UnixNanos::from(msg.timestamp);
TradeTick::new(
instrument_id,
price,
size,
aggressor_side,
trade_id,
ts_event,
ts_init,
)
}
#[must_use]
pub fn parse_trade_bin_msg(
msg: &BitmexTradeBinMsg,
topic: &BitmexWsTopic,
instrument: &InstrumentAny,
instrument_id: InstrumentId,
price_precision: u8,
ts_init: UnixNanos,
) -> Bar {
let spec = bar_spec_from_topic(topic);
let bar_type = BarType::new(instrument_id, spec, AggregationSource::External);
let open = Price::new(msg.open, price_precision);
let high = Price::new(msg.high, price_precision);
let low = Price::new(msg.low, price_precision);
let close = Price::new(msg.close, price_precision);
let (open, high, low, close) =
normalize_trade_bin_prices(open, high, low, close, &msg.symbol, Some(&bar_type));
let volume_contracts = normalize_trade_bin_volume(Some(msg.volume), &msg.symbol);
let volume = parse_contracts_quantity(volume_contracts, instrument);
let ts_event = UnixNanos::from(msg.timestamp);
Bar::new(bar_type, open, high, low, close, volume, ts_event, ts_init)
}
#[must_use]
pub fn bar_spec_from_topic(topic: &BitmexWsTopic) -> BarSpecification {
match topic {
BitmexWsTopic::TradeBin1m => BAR_SPEC_1_MINUTE,
BitmexWsTopic::TradeBin5m => BAR_SPEC_5_MINUTE,
BitmexWsTopic::TradeBin1h => BAR_SPEC_1_HOUR,
BitmexWsTopic::TradeBin1d => BAR_SPEC_1_DAY,
_ => {
log::error!("Bar specification not supported: topic={topic:?}");
BAR_SPEC_1_MINUTE
}
}
}
#[must_use]
pub fn topic_from_bar_spec(spec: BarSpecification) -> BitmexWsTopic {
match spec {
BAR_SPEC_1_MINUTE => BitmexWsTopic::TradeBin1m,
BAR_SPEC_5_MINUTE => BitmexWsTopic::TradeBin5m,
BAR_SPEC_1_HOUR => BitmexWsTopic::TradeBin1h,
BAR_SPEC_1_DAY => BitmexWsTopic::TradeBin1d,
_ => {
log::error!("Bar specification not supported: spec={spec:?}");
BitmexWsTopic::TradeBin1m
}
}
}
fn infer_order_type_from_msg(msg: &BitmexOrderMsg) -> OrderType {
if msg.stop_px.is_some() {
if msg.price.is_some() {
OrderType::StopLimit
} else {
OrderType::StopMarket
}
} else if msg.price.is_some() {
OrderType::Limit
} else {
OrderType::Market
}
}
pub fn parse_order_msg(
msg: &BitmexOrderMsg,
instrument: &InstrumentAny,
order_type_cache: &mut AHashMap<ClientOrderId, OrderType>,
ts_init: UnixNanos,
) -> anyhow::Result<OrderStatusReport> {
let account_id = AccountId::new(format!("BITMEX-{}", msg.account)); let instrument_id = parse_instrument_id(msg.symbol);
let venue_order_id = VenueOrderId::new(msg.order_id.to_string());
let common_side: BitmexSide = msg.side.into();
let order_side: OrderSide = common_side.into();
let order_type: OrderType = if let Some(ord_type) = msg.ord_type {
if ord_type == BitmexOrderType::Pegged
&& msg.peg_price_type == Some(BitmexPegPriceType::TrailingStopPeg)
{
if msg.price.is_some() {
OrderType::TrailingStopLimit
} else {
OrderType::TrailingStopMarket
}
} else {
ord_type.into()
}
} else if let Some(client_order_id) = msg.cl_ord_id {
let client_order_id = ClientOrderId::new(client_order_id);
if let Some(&cached) = order_type_cache.get(&client_order_id) {
cached
} else {
let inferred = infer_order_type_from_msg(msg);
order_type_cache.insert(client_order_id, inferred);
inferred
}
} else {
infer_order_type_from_msg(msg)
};
let time_in_force: TimeInForce = match msg.time_in_force {
Some(tif) => tif.try_into().map_err(|e| anyhow::anyhow!("{e}"))?,
None => TimeInForce::Gtc,
};
let order_status: OrderStatus = msg.ord_status.into();
let quantity = parse_signed_contracts_quantity(msg.order_qty, instrument);
let filled_qty = parse_signed_contracts_quantity(msg.cum_qty, instrument);
let report_id = UUID4::new();
let ts_accepted =
parse_optional_datetime_to_unix_nanos(&Some(msg.transact_time), "transact_time");
let ts_last = parse_optional_datetime_to_unix_nanos(&Some(msg.timestamp), "timestamp");
let mut report = OrderStatusReport::new(
account_id,
instrument_id,
None, venue_order_id,
order_side,
order_type,
time_in_force,
order_status,
quantity,
filled_qty,
ts_accepted,
ts_last,
ts_init,
Some(report_id),
);
if let Some(cl_ord_id) = &msg.cl_ord_id {
report = report.with_client_order_id(ClientOrderId::new(cl_ord_id));
}
if let Some(cl_ord_link_id) = &msg.cl_ord_link_id {
report = report.with_order_list_id(OrderListId::new(cl_ord_link_id));
}
if let Some(price) = msg.price {
report = report.with_price(Price::new(price, instrument.price_precision()));
}
if let Some(avg_px) = msg.avg_px {
report = report.with_avg_px(avg_px)?;
}
if let Some(trigger_price) = msg.stop_px {
report = report
.with_trigger_price(Price::new(trigger_price, instrument.price_precision()))
.with_trigger_type(extract_trigger_type(msg.exec_inst.as_ref()));
}
if matches!(
order_type,
OrderType::TrailingStopMarket | OrderType::TrailingStopLimit
) && let Some(peg_offset) = msg.peg_offset_value
{
let trailing_offset = Decimal::try_from(peg_offset.abs())
.unwrap_or_else(|_| Decimal::new(peg_offset.abs() as i64, 0));
report = report
.with_trailing_offset(trailing_offset)
.with_trailing_offset_type(TrailingOffsetType::Price);
if msg.stop_px.is_none() {
report = report.with_trigger_type(extract_trigger_type(msg.exec_inst.as_ref()));
}
}
if let Some(exec_insts) = &msg.exec_inst {
for exec_inst in exec_insts {
match exec_inst {
BitmexExecInstruction::ParticipateDoNotInitiate => {
report = report.with_post_only(true);
}
BitmexExecInstruction::ReduceOnly => {
report = report.with_reduce_only(true);
}
_ => {}
}
}
}
if order_status == OrderStatus::Rejected {
if let Some(reason_str) = msg.ord_rej_reason.or(msg.text) {
log::debug!(
"Order rejected with reason: order_id={:?}, client_order_id={:?}, reason={:?}",
venue_order_id,
msg.cl_ord_id,
reason_str,
);
report = report.with_cancel_reason(clean_reason(reason_str.as_ref()));
} else {
log::debug!(
"Order rejected without reason from BitMEX: order_id={:?}, client_order_id={:?}, ord_status={:?}, ord_rej_reason={:?}, text={:?}",
venue_order_id,
msg.cl_ord_id,
msg.ord_status,
msg.ord_rej_reason,
msg.text,
);
}
}
if order_status == OrderStatus::Canceled
&& let Some(reason_str) = msg.ord_rej_reason.or(msg.text)
{
report = report.with_cancel_reason(clean_reason(reason_str.as_ref()));
}
Ok(report)
}
#[derive(Debug, Clone)]
pub enum ParsedOrderEvent {
Accepted(OrderAccepted),
Canceled(OrderCanceled),
Expired(OrderExpired),
Triggered(OrderTriggered),
Rejected(OrderRejected),
}
pub fn parse_order_event(
msg: &BitmexOrderMsg,
client_order_id: ClientOrderId,
account_id: AccountId,
trader_id: TraderId,
strategy_id: StrategyId,
ts_init: UnixNanos,
) -> Option<ParsedOrderEvent> {
let instrument_id = parse_instrument_id(msg.symbol);
let venue_order_id = VenueOrderId::new(msg.order_id.to_string());
let ts_event = parse_optional_datetime_to_unix_nanos(&Some(msg.timestamp), "timestamp");
match msg.ord_status {
BitmexOrderStatus::New => {
let accepted = OrderAccepted::new(
trader_id,
strategy_id,
instrument_id,
client_order_id,
venue_order_id,
account_id,
UUID4::new(),
ts_event,
ts_init,
false,
);
Some(ParsedOrderEvent::Accepted(accepted))
}
BitmexOrderStatus::Canceled => {
let cancel_reason = msg
.ord_rej_reason
.or(msg.text)
.map(|r| clean_reason(r.as_ref()));
let is_post_only_rejection = cancel_reason
.as_deref()
.is_some_and(|r| r.contains("ParticipateDoNotInitiate"));
if is_post_only_rejection {
let rejected = OrderRejected::new(
trader_id,
strategy_id,
instrument_id,
client_order_id,
account_id,
Ustr::from(
cancel_reason
.as_deref()
.unwrap_or("Post-only order rejected"),
),
UUID4::new(),
ts_event,
ts_init,
false,
true, );
Some(ParsedOrderEvent::Rejected(rejected))
} else {
let canceled = OrderCanceled::new(
trader_id,
strategy_id,
instrument_id,
client_order_id,
UUID4::new(),
ts_event,
ts_init,
false,
Some(venue_order_id),
Some(account_id),
);
Some(ParsedOrderEvent::Canceled(canceled))
}
}
BitmexOrderStatus::Expired => {
let expired = OrderExpired::new(
trader_id,
strategy_id,
instrument_id,
client_order_id,
UUID4::new(),
ts_event,
ts_init,
false,
Some(venue_order_id),
Some(account_id),
);
Some(ParsedOrderEvent::Expired(expired))
}
_ => None,
}
}
pub fn parse_order_update_msg(
msg: &BitmexOrderUpdateMsg,
instrument: &InstrumentAny,
account_id: AccountId,
ts_init: UnixNanos,
) -> Option<OrderUpdated> {
let trader_id = TraderId::external();
let strategy_id = StrategyId::external();
let instrument_id = parse_instrument_id(msg.symbol);
let venue_order_id = Some(VenueOrderId::new(msg.order_id.to_string()));
let client_order_id = msg
.cl_ord_id
.as_ref()
.map_or_else(ClientOrderId::external, ClientOrderId::new);
let quantity = match (msg.leaves_qty, msg.cum_qty) {
(Some(leaves), Some(cum)) => parse_contracts_quantity((leaves + cum) as u64, instrument),
_ => Quantity::zero(instrument.size_precision()),
};
let price = msg
.price
.map(|p| Price::new(p, instrument.price_precision()));
let trigger_price = None;
let protection_price = None;
let event_id = UUID4::new();
let ts_event = parse_optional_datetime_to_unix_nanos(&msg.timestamp, "timestamp");
Some(OrderUpdated::new(
trader_id,
strategy_id,
instrument_id,
client_order_id,
quantity,
event_id,
ts_event,
ts_init,
false, venue_order_id,
Some(account_id),
price,
trigger_price,
protection_price,
false, ))
}
pub fn parse_execution_msg(
msg: BitmexExecutionMsg,
instrument: &InstrumentAny,
ts_init: UnixNanos,
) -> Option<FillReport> {
let exec_type = msg.exec_type?;
match exec_type {
BitmexExecType::Trade | BitmexExecType::Liquidation => {}
BitmexExecType::Bankruptcy => {
log::warn!(
"Processing bankruptcy execution as fill: exec_type={exec_type:?}, order_id={:?}, symbol={:?}",
msg.order_id,
msg.symbol,
);
}
BitmexExecType::Settlement => {
log::debug!(
"Settlement execution skipped (not a fill): applies quanto conversion/PnL transfer on contract settlement: exec_type={exec_type:?}, order_id={:?}, symbol={:?}",
msg.order_id,
msg.symbol,
);
return None;
}
BitmexExecType::TrialFill => {
log::warn!(
"Trial fill execution received (testnet only), not processed as fill: exec_type={exec_type:?}, order_id={:?}, symbol={:?}",
msg.order_id,
msg.symbol,
);
return None;
}
BitmexExecType::Funding => {
log::debug!(
"Funding execution skipped (not a fill): exec_type={exec_type:?}, order_id={:?}, symbol={:?}",
msg.order_id,
msg.symbol,
);
return None;
}
BitmexExecType::Insurance => {
log::debug!(
"Insurance execution skipped (not a fill): exec_type={exec_type:?}, order_id={:?}, symbol={:?}",
msg.order_id,
msg.symbol,
);
return None;
}
BitmexExecType::Rebalance => {
log::debug!(
"Rebalance execution skipped (not a fill): exec_type={exec_type:?}, order_id={:?}, symbol={:?}",
msg.order_id,
msg.symbol,
);
return None;
}
BitmexExecType::New
| BitmexExecType::Canceled
| BitmexExecType::CancelReject
| BitmexExecType::Replaced
| BitmexExecType::Rejected
| BitmexExecType::AmendReject
| BitmexExecType::Suspended
| BitmexExecType::Released
| BitmexExecType::TriggeredOrActivatedBySystem => {
log::debug!(
"Execution message skipped (order state change, not a fill): exec_type={exec_type:?}, order_id={:?}",
msg.order_id,
);
return None;
}
BitmexExecType::Unknown(ref type_str) => {
log::warn!(
"Unknown execution type received, skipping: exec_type={type_str}, order_id={:?}, symbol={:?}",
msg.order_id,
msg.symbol,
);
return None;
}
}
let account_id = AccountId::new(format!("BITMEX-{}", msg.account?));
let instrument_id = parse_instrument_id(msg.symbol?);
let venue_order_id = VenueOrderId::new(msg.order_id?.to_string());
let trade_id = TradeId::new(msg.trd_match_id?.to_string());
let order_side: OrderSide = msg.side.map_or(OrderSide::NoOrderSide, |s| {
let side: BitmexSide = s.into();
side.into()
});
let last_qty = parse_signed_contracts_quantity(msg.last_qty?, instrument);
let last_px = Price::new(msg.last_px?, instrument.price_precision());
let settlement_currency_str = msg.settl_currency.unwrap_or(Ustr::from("XBT"));
let mapped_currency = map_bitmex_currency(settlement_currency_str.as_str());
let currency = get_currency(&mapped_currency);
let commission = Money::new(msg.commission.unwrap_or(0.0), currency);
let liquidity_side = parse_liquidity_side(&msg.last_liquidity_ind);
let client_order_id = msg.cl_ord_id.map(ClientOrderId::new);
let venue_position_id = None; let ts_event = parse_optional_datetime_to_unix_nanos(&msg.transact_time, "transact_time");
Some(FillReport::new(
account_id,
instrument_id,
venue_order_id,
trade_id,
order_side,
last_qty,
last_px,
commission,
liquidity_side,
client_order_id,
venue_position_id,
ts_event,
ts_init,
None,
))
}
#[must_use]
pub fn parse_position_msg(
msg: &BitmexPositionMsg,
instrument: &InstrumentAny,
ts_init: UnixNanos,
) -> PositionStatusReport {
let account_id = AccountId::new(format!("BITMEX-{}", msg.account));
let instrument_id = parse_instrument_id(msg.symbol);
let position_side = parse_position_side(msg.current_qty).as_specified();
let quantity = parse_signed_contracts_quantity(msg.current_qty.unwrap_or(0), instrument);
let venue_position_id = None; let avg_px_open = msg
.avg_entry_price
.and_then(|p| Decimal::from_str(&p.to_string()).ok());
let ts_last = parse_optional_datetime_to_unix_nanos(&msg.timestamp, "timestamp");
PositionStatusReport::new(
account_id,
instrument_id,
position_side,
quantity,
ts_last,
ts_init,
None, venue_position_id, avg_px_open, )
}
#[must_use]
pub fn parse_instrument_msg(
msg: &BitmexInstrumentMsg,
instruments_cache: &AHashMap<Ustr, InstrumentAny>,
ts_init: UnixNanos,
) -> Vec<Data> {
let mut updates = Vec::new();
let is_index = is_index_symbol(&msg.symbol);
let effective_index_price = if is_index {
msg.last_price
} else {
msg.index_price
};
if msg.mark_price.is_none() && effective_index_price.is_none() {
return updates;
}
let instrument_id = InstrumentId::new(Symbol::from_ustr_unchecked(msg.symbol), *BITMEX_VENUE);
let ts_event = parse_optional_datetime_to_unix_nanos(&Some(msg.timestamp), "");
let price_precision = match instruments_cache.get(&Ustr::from(&msg.symbol)) {
Some(instrument) => instrument.price_precision(),
None => {
if is_index {
log::trace!(
"Index instrument {} not in cache, skipping update",
msg.symbol
);
} else {
log::debug!("Instrument {} not in cache, skipping update", msg.symbol);
}
return updates;
}
};
if let Some(mark_price) = msg.mark_price {
let price = Price::new(mark_price, price_precision);
updates.push(Data::MarkPriceUpdate(MarkPriceUpdate::new(
instrument_id,
price,
ts_event,
ts_init,
)));
}
if let Some(index_price) = effective_index_price {
let price = Price::new(index_price, price_precision);
updates.push(Data::IndexPriceUpdate(IndexPriceUpdate::new(
instrument_id,
price,
ts_event,
ts_init,
)));
}
updates
}
#[must_use]
pub fn parse_funding_msg(msg: &BitmexFundingMsg, ts_init: UnixNanos) -> FundingRateUpdate {
let instrument_id = InstrumentId::from(format!("{}.BITMEX", msg.symbol));
let interval_hours = msg.funding_interval.hour();
let interval_minutes = msg.funding_interval.minute();
let interval = Some((interval_hours * 60 + interval_minutes) as u16);
let ts_event = parse_optional_datetime_to_unix_nanos(&Some(msg.timestamp), "");
FundingRateUpdate::new(
instrument_id,
msg.funding_rate,
interval,
None, ts_event,
ts_init,
)
}
#[must_use]
pub fn parse_wallet_msg(msg: &BitmexWalletMsg, ts_init: UnixNanos) -> AccountState {
let account_id = AccountId::new(format!("BITMEX-{}", msg.account));
let currency_str = map_bitmex_currency(msg.currency.as_str());
let currency = get_currency(¤cy_str);
let divisor = if msg.currency == "XBt" {
100_000_000.0 } else if msg.currency == "USDt" || msg.currency == "LAMp" {
1_000_000.0 } else {
1.0
};
let amount = msg.amount.unwrap_or(0) as f64 / divisor;
let total = Money::new(amount, currency);
let locked = Money::new(0.0, currency); let free = total - locked;
let balance = AccountBalance::new_checked(total, locked, free)
.expect("Balance calculation should be valid");
AccountState::new(
account_id,
AccountType::Margin,
vec![balance],
vec![], true, UUID4::new(),
ts_init,
ts_init,
None,
)
}
#[must_use]
pub fn parse_margin_msg(msg: &BitmexMarginMsg, instrument_id: InstrumentId) -> MarginBalance {
let currency_str = map_bitmex_currency(msg.currency.as_str());
let currency = get_currency(¤cy_str);
let divisor = bitmex_currency_divisor(msg.currency.as_str());
let initial_dec = Decimal::from(msg.init_margin.unwrap_or(0).max(0)) / divisor;
let maintenance_dec = Decimal::from(msg.maint_margin.unwrap_or(0).max(0)) / divisor;
MarginBalance::new(
Money::from_decimal(initial_dec, currency).unwrap_or_else(|_| Money::zero(currency)),
Money::from_decimal(maintenance_dec, currency).unwrap_or_else(|_| Money::zero(currency)),
instrument_id,
)
}
#[must_use]
pub fn parse_margin_account_state(msg: &BitmexMarginMsg, ts_init: UnixNanos) -> AccountState {
let account_id = AccountId::new(format!("BITMEX-{}", msg.account));
let balance = parse_account_balance(msg);
let currency_str = map_bitmex_currency(msg.currency.as_str());
let margin_instrument_id = InstrumentId::new(
Symbol::from_str_unchecked(format!("ACCOUNT-{currency_str}")),
*BITMEX_VENUE,
);
let margin = parse_margin_msg(msg, margin_instrument_id);
let margins = if !margin.initial.is_zero() || !margin.maintenance.is_zero() {
vec![margin]
} else {
vec![]
};
let ts_event = parse_optional_datetime_to_unix_nanos(&Some(msg.timestamp), "margin.timestamp");
AccountState::new(
account_id,
AccountType::Margin,
vec![balance],
margins,
true,
UUID4::new(),
ts_event,
ts_init,
None,
)
}
#[cfg(test)]
mod tests {
use chrono::{DateTime, Utc};
use nautilus_model::{
enums::{AggressorSide, BookAction, LiquiditySide, PositionSide},
identifiers::Symbol,
instruments::crypto_perpetual::CryptoPerpetual,
};
use rstest::rstest;
use ustr::Ustr;
use super::*;
use crate::common::{
enums::{BitmexExecType, BitmexOrderStatus},
testing::load_test_json,
};
fn create_test_perpetual_instrument_with_precisions(
price_precision: u8,
size_precision: u8,
) -> InstrumentAny {
InstrumentAny::CryptoPerpetual(CryptoPerpetual::new(
InstrumentId::from("XBTUSD.BITMEX"),
Symbol::new("XBTUSD"),
Currency::BTC(),
Currency::USD(),
Currency::BTC(),
true, price_precision,
size_precision,
Price::new(0.5, price_precision),
Quantity::new(1.0, size_precision),
None, None, None, None, None, None, None, None, None, None, None, None, None, UnixNanos::default(),
UnixNanos::default(),
))
}
fn create_test_perpetual_instrument() -> InstrumentAny {
create_test_perpetual_instrument_with_precisions(1, 0)
}
#[rstest]
fn test_orderbook_l2_message() {
let json_data = load_test_json("ws_orderbook_l2.json");
let instrument_id = InstrumentId::from("XBTUSD.BITMEX");
let msg: BitmexOrderBookMsg = serde_json::from_str(&json_data).unwrap();
let instrument = create_test_perpetual_instrument();
let delta = parse_book_msg(
&msg,
&BitmexAction::Insert,
&instrument,
instrument.id(),
instrument.price_precision(),
UnixNanos::from(3),
);
assert_eq!(delta.instrument_id, instrument_id);
assert_eq!(delta.order.price, Price::from("98459.9"));
assert_eq!(delta.order.size, Quantity::from(33000));
assert_eq!(delta.order.side, OrderSide::Sell);
assert_eq!(delta.order.order_id, 62400580205);
assert_eq!(delta.action, BookAction::Add);
assert_eq!(delta.flags, 0);
assert_eq!(delta.sequence, 0);
assert_eq!(delta.ts_event, 1732436782356000000); assert_eq!(delta.ts_init, 3);
let delta = parse_book_msg(
&msg,
&BitmexAction::Partial,
&instrument,
instrument.id(),
instrument.price_precision(),
UnixNanos::from(3),
);
assert_eq!(delta.flags, RecordFlag::F_SNAPSHOT as u8);
assert_eq!(delta.action, BookAction::Add);
let delta = parse_book_msg(
&msg,
&BitmexAction::Update,
&instrument,
instrument.id(),
instrument.price_precision(),
UnixNanos::from(3),
);
assert_eq!(delta.flags, 0);
assert_eq!(delta.action, BookAction::Update);
}
#[rstest]
fn test_orderbook10_message() {
let json_data = load_test_json("ws_orderbook_10.json");
let instrument_id = InstrumentId::from("XBTUSD.BITMEX");
let msg: BitmexOrderBook10Msg = serde_json::from_str(&json_data).unwrap();
let instrument = create_test_perpetual_instrument();
let depth10 = parse_book10_msg(
&msg,
&instrument,
instrument.id(),
instrument.price_precision(),
UnixNanos::from(3),
)
.unwrap();
assert_eq!(depth10.instrument_id, instrument_id);
assert_eq!(depth10.bids[0].price, Price::from("98490.3"));
assert_eq!(depth10.bids[0].size, Quantity::from(22400));
assert_eq!(depth10.bids[0].side, OrderSide::Buy);
assert_eq!(depth10.asks[0].price, Price::from("98490.4"));
assert_eq!(depth10.asks[0].size, Quantity::from(17600));
assert_eq!(depth10.asks[0].side, OrderSide::Sell);
assert_eq!(depth10.bid_counts, [1; DEPTH10_LEN]);
assert_eq!(depth10.ask_counts, [1; DEPTH10_LEN]);
assert_eq!(depth10.sequence, 0);
assert_eq!(depth10.flags, RecordFlag::F_SNAPSHOT as u8);
assert_eq!(depth10.ts_event, 1732436353513000000); assert_eq!(depth10.ts_init, 3);
}
#[rstest]
fn test_quote_message() {
let json_data = load_test_json("ws_quote.json");
let instrument_id = InstrumentId::from("BCHUSDT.BITMEX");
let last_quote = QuoteTick::new(
instrument_id,
Price::new(487.50, 2),
Price::new(488.20, 2),
Quantity::from(100_000),
Quantity::from(100_000),
UnixNanos::from(1),
UnixNanos::from(2),
);
let msg: BitmexQuoteMsg = serde_json::from_str(&json_data).unwrap();
let instrument = create_test_perpetual_instrument_with_precisions(2, 0);
let quote = parse_quote_msg(
&msg,
&last_quote,
&instrument,
instrument_id,
instrument.price_precision(),
UnixNanos::from(3),
);
assert_eq!(quote.instrument_id, instrument_id);
assert_eq!(quote.bid_price, Price::from("487.55"));
assert_eq!(quote.ask_price, Price::from("488.25"));
assert_eq!(quote.bid_size, Quantity::from(103_000));
assert_eq!(quote.ask_size, Quantity::from(50_000));
assert_eq!(quote.ts_event, 1732315465085000000);
assert_eq!(quote.ts_init, 3);
}
#[rstest]
fn test_trade_message() {
let json_data = load_test_json("ws_trade.json");
let instrument_id = InstrumentId::from("XBTUSD.BITMEX");
let msg: BitmexTradeMsg = serde_json::from_str(&json_data).unwrap();
let instrument = create_test_perpetual_instrument();
let trade = parse_trade_msg(
&msg,
&instrument,
instrument.id(),
instrument.price_precision(),
UnixNanos::from(3),
);
assert_eq!(trade.instrument_id, instrument_id);
assert_eq!(trade.price, Price::from("98570.9"));
assert_eq!(trade.size, Quantity::from(100));
assert_eq!(trade.aggressor_side, AggressorSide::Seller);
assert_eq!(
trade.trade_id.to_string(),
"00000000-006d-1000-0000-000e8737d536"
);
assert_eq!(trade.ts_event, 1732436138704000000); assert_eq!(trade.ts_init, 3);
}
#[rstest]
fn test_trade_bin_message() {
let json_data = load_test_json("ws_trade_bin_1m.json");
let instrument_id = InstrumentId::from("XBTUSD.BITMEX");
let topic = BitmexWsTopic::TradeBin1m;
let msg: BitmexTradeBinMsg = serde_json::from_str(&json_data).unwrap();
let instrument = create_test_perpetual_instrument();
let bar = parse_trade_bin_msg(
&msg,
&topic,
&instrument,
instrument.id(),
instrument.price_precision(),
UnixNanos::from(3),
);
assert_eq!(bar.instrument_id(), instrument_id);
assert_eq!(
bar.bar_type.spec(),
BarSpecification::new(1, BarAggregation::Minute, PriceType::Last)
);
assert_eq!(bar.open, Price::from("97550.0"));
assert_eq!(bar.high, Price::from("97584.4"));
assert_eq!(bar.low, Price::from("97550.0"));
assert_eq!(bar.close, Price::from("97570.1"));
assert_eq!(bar.volume, Quantity::from(84_000));
assert_eq!(bar.ts_event, 1732392420000000000); assert_eq!(bar.ts_init, 3);
}
#[rstest]
fn test_trade_bin_message_extreme_adjustment() {
let topic = BitmexWsTopic::TradeBin1m;
let instrument = create_test_perpetual_instrument();
let msg = BitmexTradeBinMsg {
timestamp: DateTime::parse_from_rfc3339("2024-01-01T00:00:00Z")
.unwrap()
.with_timezone(&Utc),
symbol: Ustr::from("XBTUSD"),
open: 50_000.0,
high: 49_990.0,
low: 50_010.0,
close: 50_005.0,
trades: 10,
volume: 1_000,
vwap: Some(0.0),
last_size: Some(0),
turnover: 0,
home_notional: 0.0,
foreign_notional: 0.0,
pool: None,
};
let bar = parse_trade_bin_msg(
&msg,
&topic,
&instrument,
instrument.id(),
instrument.price_precision(),
UnixNanos::from(3),
);
assert_eq!(bar.high, Price::from("50010.0"));
assert_eq!(bar.low, Price::from("49990.0"));
assert_eq!(bar.open, Price::from("50000.0"));
assert_eq!(bar.close, Price::from("50005.0"));
assert_eq!(bar.volume, Quantity::from(1_000));
}
#[rstest]
fn test_parse_order_msg() {
let json_data = load_test_json("ws_order.json");
let msg: BitmexOrderMsg = serde_json::from_str(&json_data).unwrap();
let mut cache = AHashMap::new();
let instrument = create_test_perpetual_instrument();
let report = parse_order_msg(&msg, &instrument, &mut cache, UnixNanos::default()).unwrap();
assert_eq!(report.account_id.to_string(), "BITMEX-1234567");
assert_eq!(report.instrument_id, InstrumentId::from("XBTUSD.BITMEX"));
assert_eq!(
report.venue_order_id.to_string(),
"550e8400-e29b-41d4-a716-446655440001"
);
assert_eq!(
report.client_order_id.unwrap().to_string(),
"mm_bitmex_1a/oemUeQ4CAJZgP3fjHsA"
);
assert_eq!(report.order_side, OrderSide::Buy);
assert_eq!(report.order_type, OrderType::Limit);
assert_eq!(report.time_in_force, TimeInForce::Gtc);
assert_eq!(report.order_status, OrderStatus::Accepted);
assert_eq!(report.quantity, Quantity::from(100));
assert_eq!(report.filled_qty, Quantity::from(0));
assert_eq!(report.price.unwrap(), Price::from("98000.0"));
assert_eq!(report.ts_accepted, 1732530600000000000); }
#[rstest]
fn test_parse_order_msg_infers_type_when_missing() {
let json_data = load_test_json("ws_order.json");
let mut msg: BitmexOrderMsg = serde_json::from_str(&json_data).unwrap();
msg.ord_type = None;
msg.cl_ord_id = None;
msg.price = Some(98_000.0);
msg.stop_px = None;
let mut cache = AHashMap::new();
let instrument = create_test_perpetual_instrument();
let report = parse_order_msg(&msg, &instrument, &mut cache, UnixNanos::default()).unwrap();
assert_eq!(report.order_type, OrderType::Limit);
}
#[rstest]
fn test_parse_order_msg_rejected_with_reason() {
let mut msg: BitmexOrderMsg =
serde_json::from_str(&load_test_json("ws_order.json")).unwrap();
msg.ord_status = BitmexOrderStatus::Rejected;
msg.ord_rej_reason = Some(Ustr::from("Insufficient available balance"));
msg.text = None;
msg.cum_qty = 0;
let mut cache = AHashMap::new();
let instrument = create_test_perpetual_instrument();
let report = parse_order_msg(&msg, &instrument, &mut cache, UnixNanos::default()).unwrap();
assert_eq!(report.order_status, OrderStatus::Rejected);
assert_eq!(
report.cancel_reason,
Some("Insufficient available balance".to_string())
);
}
#[rstest]
fn test_parse_order_msg_rejected_with_text_fallback() {
let mut msg: BitmexOrderMsg =
serde_json::from_str(&load_test_json("ws_order.json")).unwrap();
msg.ord_status = BitmexOrderStatus::Rejected;
msg.ord_rej_reason = None;
msg.text = Some(Ustr::from("Order would execute immediately"));
msg.cum_qty = 0;
let mut cache = AHashMap::new();
let instrument = create_test_perpetual_instrument();
let report = parse_order_msg(&msg, &instrument, &mut cache, UnixNanos::default()).unwrap();
assert_eq!(report.order_status, OrderStatus::Rejected);
assert_eq!(
report.cancel_reason,
Some("Order would execute immediately".to_string())
);
}
#[rstest]
fn test_parse_order_msg_rejected_without_reason() {
let mut msg: BitmexOrderMsg =
serde_json::from_str(&load_test_json("ws_order.json")).unwrap();
msg.ord_status = BitmexOrderStatus::Rejected;
msg.ord_rej_reason = None;
msg.text = None;
msg.cum_qty = 0;
let mut cache = AHashMap::new();
let instrument = create_test_perpetual_instrument();
let report = parse_order_msg(&msg, &instrument, &mut cache, UnixNanos::default()).unwrap();
assert_eq!(report.order_status, OrderStatus::Rejected);
assert_eq!(report.cancel_reason, None);
}
#[rstest]
fn test_parse_execution_msg() {
let json_data = load_test_json("ws_execution.json");
let msg: BitmexExecutionMsg = serde_json::from_str(&json_data).unwrap();
let instrument = create_test_perpetual_instrument();
let fill = parse_execution_msg(msg, &instrument, UnixNanos::default()).unwrap();
assert_eq!(fill.account_id.to_string(), "BITMEX-1234567");
assert_eq!(fill.instrument_id, InstrumentId::from("XBTUSD.BITMEX"));
assert_eq!(
fill.venue_order_id.to_string(),
"550e8400-e29b-41d4-a716-446655440002"
);
assert_eq!(
fill.trade_id.to_string(),
"00000000-006d-1000-0000-000e8737d540"
);
assert_eq!(
fill.client_order_id.unwrap().to_string(),
"mm_bitmex_2b/oemUeQ4CAJZgP3fjHsB"
);
assert_eq!(fill.order_side, OrderSide::Sell);
assert_eq!(fill.last_qty, Quantity::from(100));
assert_eq!(fill.last_px, Price::from("98950.0"));
assert_eq!(fill.liquidity_side, LiquiditySide::Maker);
assert_eq!(fill.commission, Money::new(0.00075, Currency::from("XBT")));
assert_eq!(fill.commission.currency.code.to_string(), "XBT");
assert_eq!(fill.ts_event, 1732530900789000000); }
#[rstest]
fn test_parse_execution_msg_non_trade() {
let mut msg: BitmexExecutionMsg =
serde_json::from_str(&load_test_json("ws_execution.json")).unwrap();
msg.exec_type = Some(BitmexExecType::Settlement);
let instrument = create_test_perpetual_instrument();
let result = parse_execution_msg(msg, &instrument, UnixNanos::default());
assert!(result.is_none());
}
#[rstest]
fn test_parse_cancel_reject_execution() {
let json = load_test_json("ws_execution_cancel_reject.json");
let msg: BitmexExecutionMsg = serde_json::from_str(&json).unwrap();
assert_eq!(msg.exec_type, Some(BitmexExecType::CancelReject));
assert_eq!(msg.ord_status, Some(BitmexOrderStatus::Rejected));
assert_eq!(msg.symbol, None);
let instrument = create_test_perpetual_instrument();
let result = parse_execution_msg(msg, &instrument, UnixNanos::default());
assert!(result.is_none());
}
#[rstest]
fn test_parse_execution_msg_liquidation() {
let mut msg: BitmexExecutionMsg =
serde_json::from_str(&load_test_json("ws_execution.json")).unwrap();
msg.exec_type = Some(BitmexExecType::Liquidation);
let instrument = create_test_perpetual_instrument();
let fill = parse_execution_msg(msg, &instrument, UnixNanos::default()).unwrap();
assert_eq!(fill.account_id.to_string(), "BITMEX-1234567");
assert_eq!(fill.instrument_id, InstrumentId::from("XBTUSD.BITMEX"));
assert_eq!(fill.order_side, OrderSide::Sell);
assert_eq!(fill.last_qty, Quantity::from(100));
assert_eq!(fill.last_px, Price::from("98950.0"));
}
#[rstest]
fn test_parse_execution_msg_bankruptcy() {
let mut msg: BitmexExecutionMsg =
serde_json::from_str(&load_test_json("ws_execution.json")).unwrap();
msg.exec_type = Some(BitmexExecType::Bankruptcy);
let instrument = create_test_perpetual_instrument();
let fill = parse_execution_msg(msg, &instrument, UnixNanos::default()).unwrap();
assert_eq!(fill.account_id.to_string(), "BITMEX-1234567");
assert_eq!(fill.instrument_id, InstrumentId::from("XBTUSD.BITMEX"));
assert_eq!(fill.order_side, OrderSide::Sell);
assert_eq!(fill.last_qty, Quantity::from(100));
}
#[rstest]
fn test_parse_execution_msg_settlement() {
let mut msg: BitmexExecutionMsg =
serde_json::from_str(&load_test_json("ws_execution.json")).unwrap();
msg.exec_type = Some(BitmexExecType::Settlement);
let instrument = create_test_perpetual_instrument();
let result = parse_execution_msg(msg, &instrument, UnixNanos::default());
assert!(result.is_none());
}
#[rstest]
fn test_parse_execution_msg_trial_fill() {
let mut msg: BitmexExecutionMsg =
serde_json::from_str(&load_test_json("ws_execution.json")).unwrap();
msg.exec_type = Some(BitmexExecType::TrialFill);
let instrument = create_test_perpetual_instrument();
let result = parse_execution_msg(msg, &instrument, UnixNanos::default());
assert!(result.is_none());
}
#[rstest]
fn test_parse_execution_msg_funding() {
let mut msg: BitmexExecutionMsg =
serde_json::from_str(&load_test_json("ws_execution.json")).unwrap();
msg.exec_type = Some(BitmexExecType::Funding);
let instrument = create_test_perpetual_instrument();
let result = parse_execution_msg(msg, &instrument, UnixNanos::default());
assert!(result.is_none());
}
#[rstest]
fn test_parse_execution_msg_insurance() {
let mut msg: BitmexExecutionMsg =
serde_json::from_str(&load_test_json("ws_execution.json")).unwrap();
msg.exec_type = Some(BitmexExecType::Insurance);
let instrument = create_test_perpetual_instrument();
let result = parse_execution_msg(msg, &instrument, UnixNanos::default());
assert!(result.is_none());
}
#[rstest]
fn test_parse_execution_msg_rebalance() {
let mut msg: BitmexExecutionMsg =
serde_json::from_str(&load_test_json("ws_execution.json")).unwrap();
msg.exec_type = Some(BitmexExecType::Rebalance);
let instrument = create_test_perpetual_instrument();
let result = parse_execution_msg(msg, &instrument, UnixNanos::default());
assert!(result.is_none());
}
#[rstest]
fn test_parse_execution_msg_order_state_changes() {
let instrument = create_test_perpetual_instrument();
let order_state_types = vec![
BitmexExecType::New,
BitmexExecType::Canceled,
BitmexExecType::CancelReject,
BitmexExecType::Replaced,
BitmexExecType::Rejected,
BitmexExecType::AmendReject,
BitmexExecType::Suspended,
BitmexExecType::Released,
BitmexExecType::TriggeredOrActivatedBySystem,
];
for exec_type in order_state_types {
let mut msg: BitmexExecutionMsg =
serde_json::from_str(&load_test_json("ws_execution.json")).unwrap();
msg.exec_type = Some(exec_type.clone());
let result = parse_execution_msg(msg, &instrument, UnixNanos::default());
assert!(
result.is_none(),
"Expected None for exec_type {exec_type:?}"
);
}
}
#[rstest]
fn test_parse_position_msg() {
let json_data = load_test_json("ws_position.json");
let msg: BitmexPositionMsg = serde_json::from_str(&json_data).unwrap();
let instrument = create_test_perpetual_instrument();
let report = parse_position_msg(&msg, &instrument, UnixNanos::default());
assert_eq!(report.account_id.to_string(), "BITMEX-1234567");
assert_eq!(report.instrument_id, InstrumentId::from("XBTUSD.BITMEX"));
assert_eq!(report.position_side.as_position_side(), PositionSide::Long);
assert_eq!(report.quantity, Quantity::from(1000));
assert!(report.venue_position_id.is_none());
assert_eq!(report.ts_last, 1732530900789000000); }
#[rstest]
fn test_parse_position_msg_short() {
let mut msg: BitmexPositionMsg =
serde_json::from_str(&load_test_json("ws_position.json")).unwrap();
msg.current_qty = Some(-500);
let instrument = create_test_perpetual_instrument();
let report = parse_position_msg(&msg, &instrument, UnixNanos::default());
assert_eq!(report.position_side.as_position_side(), PositionSide::Short);
assert_eq!(report.quantity, Quantity::from(500));
}
#[rstest]
fn test_parse_position_msg_flat() {
let mut msg: BitmexPositionMsg =
serde_json::from_str(&load_test_json("ws_position.json")).unwrap();
msg.current_qty = Some(0);
let instrument = create_test_perpetual_instrument();
let report = parse_position_msg(&msg, &instrument, UnixNanos::default());
assert_eq!(report.position_side.as_position_side(), PositionSide::Flat);
assert_eq!(report.quantity, Quantity::from(0));
}
#[rstest]
fn test_parse_wallet_msg() {
let json_data = load_test_json("ws_wallet.json");
let msg: BitmexWalletMsg = serde_json::from_str(&json_data).unwrap();
let ts_init = UnixNanos::from(1);
let account_state = parse_wallet_msg(&msg, ts_init);
assert_eq!(account_state.account_id.to_string(), "BITMEX-1234567");
assert!(!account_state.balances.is_empty());
let balance = &account_state.balances[0];
assert_eq!(balance.currency.code.to_string(), "XBT");
assert!((balance.total.as_f64() - 1.0000518).abs() < 1e-7);
}
#[rstest]
fn test_parse_wallet_msg_no_amount() {
let mut msg: BitmexWalletMsg =
serde_json::from_str(&load_test_json("ws_wallet.json")).unwrap();
msg.amount = None;
let ts_init = UnixNanos::from(1);
let account_state = parse_wallet_msg(&msg, ts_init);
let balance = &account_state.balances[0];
assert_eq!(balance.total.as_f64(), 0.0);
}
#[rstest]
fn test_parse_margin_msg() {
let json_data = load_test_json("ws_margin.json");
let msg: BitmexMarginMsg = serde_json::from_str(&json_data).unwrap();
let instrument_id = InstrumentId::from("XBTUSD.BITMEX");
let margin_balance = parse_margin_msg(&msg, instrument_id);
assert_eq!(margin_balance.currency.code.to_string(), "XBT");
assert_eq!(margin_balance.instrument_id, instrument_id);
assert_eq!(margin_balance.initial.as_f64(), 0.0);
assert!((margin_balance.maintenance.as_f64() - 0.00015949).abs() < 1e-8);
}
#[rstest]
fn test_parse_margin_msg_no_available() {
let mut msg: BitmexMarginMsg =
serde_json::from_str(&load_test_json("ws_margin.json")).unwrap();
msg.available_margin = None;
let instrument_id = InstrumentId::from("XBTUSD.BITMEX");
let margin_balance = parse_margin_msg(&msg, instrument_id);
assert!(margin_balance.initial.as_f64() >= 0.0);
assert!(margin_balance.maintenance.as_f64() >= 0.0);
}
#[rstest]
fn test_parse_margin_account_state_includes_margins() {
let msg = BitmexMarginMsg {
account: 123456,
currency: Ustr::from("USDt"),
risk_limit: None,
amount: Some(5_000_000_000),
prev_realised_pnl: None,
gross_comm: None,
gross_open_cost: None,
gross_open_premium: None,
gross_exec_cost: None,
gross_mark_value: None,
risk_value: None,
init_margin: Some(200_000_000), maint_margin: Some(100_000_000), target_excess_margin: None,
realised_pnl: None,
unrealised_pnl: None,
wallet_balance: Some(5_000_000_000), margin_balance: None,
margin_leverage: None,
margin_used_pcnt: None,
excess_margin: None,
available_margin: Some(4_800_000_000), withdrawable_margin: None,
maker_fee_discount: None,
taker_fee_discount: None,
timestamp: DateTime::<Utc>::from_timestamp(1_700_000_000, 0).unwrap(),
foreign_margin_balance: None,
foreign_requirement: None,
};
let ts_init = UnixNanos::from(1_000_000_000u64);
let state = parse_margin_account_state(&msg, ts_init);
assert_eq!(state.account_id.to_string(), "BITMEX-123456");
assert_eq!(state.account_type, AccountType::Margin);
assert_eq!(state.balances.len(), 1);
assert_eq!(state.margins.len(), 1);
let balance = &state.balances[0];
assert_eq!(balance.total.as_f64(), 5000.0);
let margin = &state.margins[0];
assert_eq!(margin.instrument_id.symbol.as_str(), "ACCOUNT-USDT");
assert_eq!(margin.instrument_id.venue.as_str(), "BITMEX");
assert_eq!(margin.initial.as_f64(), 200.0);
assert_eq!(margin.maintenance.as_f64(), 100.0);
}
#[rstest]
fn test_parse_margin_account_state_zero_margins_excluded() {
let msg = BitmexMarginMsg {
account: 123456,
currency: Ustr::from("XBt"),
risk_limit: None,
amount: Some(100_000_000),
prev_realised_pnl: None,
gross_comm: None,
gross_open_cost: None,
gross_open_premium: None,
gross_exec_cost: None,
gross_mark_value: None,
risk_value: None,
init_margin: Some(0),
maint_margin: Some(0),
target_excess_margin: None,
realised_pnl: None,
unrealised_pnl: None,
wallet_balance: Some(100_000_000),
margin_balance: None,
margin_leverage: None,
margin_used_pcnt: None,
excess_margin: None,
available_margin: Some(100_000_000),
withdrawable_margin: None,
maker_fee_discount: None,
taker_fee_discount: None,
timestamp: DateTime::<Utc>::from_timestamp(1_700_000_000, 0).unwrap(),
foreign_margin_balance: None,
foreign_requirement: None,
};
let state = parse_margin_account_state(&msg, UnixNanos::from(1_000_000_000u64));
assert_eq!(state.balances.len(), 1);
assert_eq!(state.margins.len(), 0);
}
#[rstest]
fn test_parse_instrument_msg_both_prices() {
let json_data = load_test_json("ws_instrument.json");
let msg: BitmexInstrumentMsg = serde_json::from_str(&json_data).unwrap();
let mut instruments_cache = AHashMap::new();
let test_instrument = create_test_perpetual_instrument();
instruments_cache.insert(Ustr::from("XBTUSD"), test_instrument);
let updates = parse_instrument_msg(&msg, &instruments_cache, UnixNanos::from(1));
assert_eq!(updates.len(), 2);
match &updates[0] {
Data::MarkPriceUpdate(update) => {
assert_eq!(update.instrument_id.to_string(), "XBTUSD.BITMEX");
assert_eq!(update.value.as_f64(), 95125.7);
}
_ => panic!("Expected MarkPriceUpdate at index 0"),
}
match &updates[1] {
Data::IndexPriceUpdate(update) => {
assert_eq!(update.instrument_id.to_string(), "XBTUSD.BITMEX");
assert_eq!(update.value.as_f64(), 95124.3);
}
_ => panic!("Expected IndexPriceUpdate at index 1"),
}
}
#[rstest]
fn test_parse_instrument_msg_mark_price_only() {
let mut msg: BitmexInstrumentMsg =
serde_json::from_str(&load_test_json("ws_instrument.json")).unwrap();
msg.index_price = None;
let mut instruments_cache = AHashMap::new();
let test_instrument = create_test_perpetual_instrument();
instruments_cache.insert(Ustr::from("XBTUSD"), test_instrument);
let updates = parse_instrument_msg(&msg, &instruments_cache, UnixNanos::from(1));
assert_eq!(updates.len(), 1);
match &updates[0] {
Data::MarkPriceUpdate(update) => {
assert_eq!(update.instrument_id.to_string(), "XBTUSD.BITMEX");
assert_eq!(update.value.as_f64(), 95125.7);
}
_ => panic!("Expected MarkPriceUpdate"),
}
}
#[rstest]
fn test_parse_instrument_msg_index_price_only() {
let mut msg: BitmexInstrumentMsg =
serde_json::from_str(&load_test_json("ws_instrument.json")).unwrap();
msg.mark_price = None;
let mut instruments_cache = AHashMap::new();
let test_instrument = create_test_perpetual_instrument();
instruments_cache.insert(Ustr::from("XBTUSD"), test_instrument);
let updates = parse_instrument_msg(&msg, &instruments_cache, UnixNanos::from(1));
assert_eq!(updates.len(), 1);
match &updates[0] {
Data::IndexPriceUpdate(update) => {
assert_eq!(update.instrument_id.to_string(), "XBTUSD.BITMEX");
assert_eq!(update.value.as_f64(), 95124.3);
}
_ => panic!("Expected IndexPriceUpdate"),
}
}
#[rstest]
fn test_parse_instrument_msg_no_prices() {
let mut msg: BitmexInstrumentMsg =
serde_json::from_str(&load_test_json("ws_instrument.json")).unwrap();
msg.mark_price = None;
msg.index_price = None;
msg.last_price = None;
let mut instruments_cache = AHashMap::new();
let test_instrument = create_test_perpetual_instrument();
instruments_cache.insert(Ustr::from("XBTUSD"), test_instrument);
let updates = parse_instrument_msg(&msg, &instruments_cache, UnixNanos::from(1));
assert_eq!(updates.len(), 0);
}
#[rstest]
fn test_parse_instrument_msg_index_symbol() {
let mut msg: BitmexInstrumentMsg =
serde_json::from_str(&load_test_json("ws_instrument.json")).unwrap();
msg.symbol = Ustr::from(".BXBT");
msg.last_price = Some(119163.05);
msg.mark_price = Some(119163.05); msg.index_price = None;
let instrument_id = InstrumentId::from(".BXBT.BITMEX");
let instrument = CryptoPerpetual::new(
instrument_id,
Symbol::from(".BXBT"),
Currency::BTC(),
Currency::USD(),
Currency::USD(),
false, 2, 8, Price::from("0.01"),
Quantity::from("0.00000001"),
None, None, None, None, None, None, None, None, None, None, None, None, None, UnixNanos::default(), UnixNanos::default(), );
let mut instruments_cache = AHashMap::new();
instruments_cache.insert(
Ustr::from(".BXBT"),
InstrumentAny::CryptoPerpetual(instrument),
);
let updates = parse_instrument_msg(&msg, &instruments_cache, UnixNanos::from(1));
assert_eq!(updates.len(), 2);
match &updates[0] {
Data::MarkPriceUpdate(update) => {
assert_eq!(update.instrument_id.to_string(), ".BXBT.BITMEX");
assert_eq!(update.value, Price::from("119163.05"));
}
_ => panic!("Expected MarkPriceUpdate for index symbol"),
}
match &updates[1] {
Data::IndexPriceUpdate(update) => {
assert_eq!(update.instrument_id.to_string(), ".BXBT.BITMEX");
assert_eq!(update.value, Price::from("119163.05"));
assert_eq!(update.ts_init, UnixNanos::from(1));
}
_ => panic!("Expected IndexPriceUpdate for index symbol"),
}
}
#[rstest]
fn test_parse_funding_msg() {
let json_data = load_test_json("ws_funding_rate.json");
let msg: BitmexFundingMsg = serde_json::from_str(&json_data).unwrap();
let update = parse_funding_msg(&msg, UnixNanos::from(1));
assert_eq!(update.instrument_id.to_string(), "XBTUSD.BITMEX");
assert_eq!(update.rate.to_string(), "0.0001");
assert_eq!(update.interval, Some(60 * 8));
assert!(update.next_funding_ns.is_none());
assert_eq!(update.ts_event, UnixNanos::from(1732507200000000000));
assert_eq!(update.ts_init, UnixNanos::from(1));
}
}