use std::{
future::Future,
sync::{
Arc,
atomic::{AtomicBool, Ordering},
},
};
use ahash::AHashMap;
use anyhow::Context;
use futures_util::StreamExt;
use nautilus_common::{
cache::quote::QuoteCache,
clients::DataClient,
live::{runner::get_data_event_sender, runtime::get_runtime},
messages::{
DataEvent,
data::{
BarsResponse, DataResponse, InstrumentResponse, InstrumentsResponse, RequestBars,
RequestInstrument, RequestInstruments, RequestTrades, SubscribeBars,
SubscribeBookDeltas, SubscribeBookDepth10, SubscribeFundingRates, SubscribeIndexPrices,
SubscribeInstrument, SubscribeInstrumentStatus, SubscribeInstruments,
SubscribeMarkPrices, SubscribeQuotes, SubscribeTrades, TradesResponse, UnsubscribeBars,
UnsubscribeBookDeltas, UnsubscribeBookDepth10, UnsubscribeFundingRates,
UnsubscribeIndexPrices, UnsubscribeInstrumentStatus, UnsubscribeMarkPrices,
UnsubscribeQuotes, UnsubscribeTrades,
},
},
};
use nautilus_core::{
AtomicMap, UnixNanos,
datetime::datetime_to_unix_nanos,
time::{AtomicTime, get_atomic_clock_realtime},
};
use nautilus_model::{
data::{Data, InstrumentStatus},
enums::{BookType, MarketStatusAction},
identifiers::{ClientId, InstrumentId, Venue},
instruments::{Instrument, InstrumentAny},
types::Price,
};
use tokio::{task::JoinHandle, time::Duration};
use tokio_util::sync::CancellationToken;
use ustr::Ustr;
use crate::{
common::{
consts::BITMEX_VENUE,
enums::BitmexInstrumentState,
parse::{
parse_contracts_quantity, parse_instrument_id, parse_optional_datetime_to_unix_nanos,
},
},
config::BitmexDataClientConfig,
http::{
client::BitmexHttpClient,
parse::{InstrumentParseResult, parse_instrument_any},
},
websocket::{
client::BitmexWebSocketClient,
enums::{BitmexAction, BitmexBookChannel, BitmexWsTopic},
messages::{BitmexQuoteMsg, BitmexTableMessage, BitmexWsMessage},
parse::{
parse_book_msg_vec, parse_book10_msg_vec, parse_funding_msg, parse_instrument_msg,
parse_trade_bin_msg_vec, parse_trade_msg_vec,
},
},
};
#[derive(Debug)]
pub struct BitmexDataClient {
client_id: ClientId,
clock: &'static AtomicTime,
config: BitmexDataClientConfig,
http_client: BitmexHttpClient,
ws_client: Option<BitmexWebSocketClient>,
is_connected: AtomicBool,
cancellation_token: CancellationToken,
tasks: Vec<JoinHandle<()>>,
data_sender: tokio::sync::mpsc::UnboundedSender<DataEvent>,
instruments: Arc<AtomicMap<InstrumentId, InstrumentAny>>,
book_channels: Arc<AtomicMap<InstrumentId, BitmexBookChannel>>,
instrument_refresh_active: bool,
}
impl BitmexDataClient {
pub fn new(client_id: ClientId, config: BitmexDataClientConfig) -> anyhow::Result<Self> {
let clock = get_atomic_clock_realtime();
let data_sender = get_data_event_sender();
let http_client = BitmexHttpClient::new(
Some(config.http_base_url()),
config.api_key.clone(),
config.api_secret.clone(),
config.use_testnet,
config.http_timeout_secs,
config.max_retries,
config.retry_delay_initial_ms,
config.retry_delay_max_ms,
config.recv_window_ms,
config.max_requests_per_second,
config.max_requests_per_minute,
config.http_proxy_url.clone(),
)
.context("failed to construct BitMEX HTTP client")?;
Ok(Self {
client_id,
clock,
config,
http_client,
ws_client: None,
is_connected: AtomicBool::new(false),
cancellation_token: CancellationToken::new(),
tasks: Vec::new(),
data_sender,
instruments: Arc::new(AtomicMap::new()),
book_channels: Arc::new(AtomicMap::new()),
instrument_refresh_active: false,
})
}
fn venue(&self) -> Venue {
*BITMEX_VENUE
}
fn ws_client(&self) -> anyhow::Result<&BitmexWebSocketClient> {
self.ws_client
.as_ref()
.context("websocket client not initialized; call connect first")
}
fn ws_client_mut(&mut self) -> anyhow::Result<&mut BitmexWebSocketClient> {
self.ws_client
.as_mut()
.context("websocket client not initialized; call connect first")
}
fn send_data(sender: &tokio::sync::mpsc::UnboundedSender<DataEvent>, data: Data) {
if let Err(e) = sender.send(DataEvent::Data(data)) {
log::error!("Failed to emit data event: {e}");
}
}
fn spawn_ws<F>(&self, fut: F, context: &'static str)
where
F: Future<Output = anyhow::Result<()>> + Send + 'static,
{
get_runtime().spawn(async move {
if let Err(e) = fut.await {
log::error!("{context}: {e:?}");
}
});
}
fn spawn_stream_task(
&mut self,
stream: impl futures_util::Stream<Item = BitmexWsMessage> + Send + 'static,
) {
let data_sender = self.data_sender.clone();
let instruments = Arc::clone(&self.instruments);
let cancellation = self.cancellation_token.clone();
let clock = self.clock;
let instruments_by_symbol: AHashMap<Ustr, InstrumentAny> = {
let guard = instruments.load();
guard
.values()
.map(|inst| (inst.symbol().inner(), inst.clone()))
.collect()
};
let handle = get_runtime().spawn(async move {
tokio::pin!(stream);
let mut quote_cache = QuoteCache::new();
let mut insts_by_symbol = instruments_by_symbol;
loop {
tokio::select! {
maybe_msg = stream.next() => {
match maybe_msg {
Some(msg) => Self::handle_ws_message(
clock.get_time_ns(),
msg,
&data_sender,
&instruments,
&mut insts_by_symbol,
&mut quote_cache,
),
None => {
log::debug!("BitMEX websocket stream ended");
break;
}
}
}
() = cancellation.cancelled() => {
log::debug!("BitMEX websocket stream task cancelled");
break;
}
}
}
});
self.tasks.push(handle);
}
fn handle_ws_message(
ts_init: UnixNanos,
message: BitmexWsMessage,
sender: &tokio::sync::mpsc::UnboundedSender<DataEvent>,
instruments: &Arc<AtomicMap<InstrumentId, InstrumentAny>>,
instruments_by_symbol: &mut AHashMap<Ustr, InstrumentAny>,
quote_cache: &mut QuoteCache,
) {
match message {
BitmexWsMessage::Table(table_msg) => {
match table_msg {
BitmexTableMessage::OrderBookL2 { action, data }
| BitmexTableMessage::OrderBookL2_25 { action, data } => {
if !data.is_empty() {
let parsed =
parse_book_msg_vec(data, action, instruments_by_symbol, ts_init);
for d in parsed {
Self::send_data(sender, d);
}
}
}
BitmexTableMessage::OrderBook10 { data, .. } => {
if !data.is_empty() {
let parsed = parse_book10_msg_vec(data, instruments_by_symbol, ts_init);
for d in parsed {
Self::send_data(sender, d);
}
}
}
BitmexTableMessage::Quote { data, .. } => {
handle_quote_messages(
data,
instruments_by_symbol,
quote_cache,
ts_init,
sender,
);
}
BitmexTableMessage::Trade { data, .. } => {
if !data.is_empty() {
let parsed = parse_trade_msg_vec(data, instruments_by_symbol, ts_init);
for d in parsed {
Self::send_data(sender, d);
}
}
}
BitmexTableMessage::TradeBin1m { action, data } => {
if action != BitmexAction::Partial && !data.is_empty() {
let parsed = parse_trade_bin_msg_vec(
data,
&BitmexWsTopic::TradeBin1m,
instruments_by_symbol,
ts_init,
);
for d in parsed {
Self::send_data(sender, d);
}
}
}
BitmexTableMessage::TradeBin5m { action, data } => {
if action != BitmexAction::Partial && !data.is_empty() {
let parsed = parse_trade_bin_msg_vec(
data,
&BitmexWsTopic::TradeBin5m,
instruments_by_symbol,
ts_init,
);
for d in parsed {
Self::send_data(sender, d);
}
}
}
BitmexTableMessage::TradeBin1h { action, data } => {
if action != BitmexAction::Partial && !data.is_empty() {
let parsed = parse_trade_bin_msg_vec(
data,
&BitmexWsTopic::TradeBin1h,
instruments_by_symbol,
ts_init,
);
for d in parsed {
Self::send_data(sender, d);
}
}
}
BitmexTableMessage::TradeBin1d { action, data } => {
if action != BitmexAction::Partial && !data.is_empty() {
let parsed = parse_trade_bin_msg_vec(
data,
&BitmexWsTopic::TradeBin1d,
instruments_by_symbol,
ts_init,
);
for d in parsed {
Self::send_data(sender, d);
}
}
}
BitmexTableMessage::Instrument { action, data } => {
Self::handle_instrument_msg(
action,
data,
ts_init,
sender,
instruments,
instruments_by_symbol,
);
}
BitmexTableMessage::Funding { data, .. } => {
for msg in data {
let update = parse_funding_msg(&msg, ts_init);
log::debug!(
"Funding rate update: instrument={}, rate={}",
update.instrument_id,
update.rate,
);
if let Err(e) = sender.send(DataEvent::FundingRate(update)) {
log::error!("Failed to emit funding rate event: {e}");
}
}
}
BitmexTableMessage::Order { .. }
| BitmexTableMessage::Execution { .. }
| BitmexTableMessage::Position { .. }
| BitmexTableMessage::Wallet { .. }
| BitmexTableMessage::Margin { .. } => {
log::debug!("Ignoring trading message on data client");
}
_ => {
log::warn!("Unhandled table message type on data client");
}
}
}
BitmexWsMessage::Reconnected => {
quote_cache.clear();
log::info!("BitMEX websocket reconnected");
}
BitmexWsMessage::Authenticated => {
log::debug!("BitMEX websocket authenticated");
}
}
}
fn handle_instrument_msg(
action: BitmexAction,
data: Vec<crate::websocket::messages::BitmexInstrumentMsg>,
ts_init: UnixNanos,
sender: &tokio::sync::mpsc::UnboundedSender<DataEvent>,
instruments: &Arc<AtomicMap<InstrumentId, InstrumentAny>>,
instruments_by_symbol: &mut AHashMap<Ustr, InstrumentAny>,
) {
match action {
BitmexAction::Partial | BitmexAction::Insert => {
let mut new_instruments = Vec::with_capacity(data.len());
let mut temp_cache: AHashMap<Ustr, InstrumentAny> = AHashMap::new();
let data_for_prices = data.clone();
for msg in data {
match msg.try_into() {
Ok(http_inst) => match parse_instrument_any(&http_inst, ts_init) {
InstrumentParseResult::Ok(boxed) => {
let instrument_any = *boxed;
let symbol = instrument_any.symbol().inner();
temp_cache.insert(symbol, instrument_any.clone());
new_instruments.push(instrument_any);
}
InstrumentParseResult::Unsupported { .. }
| InstrumentParseResult::Inactive { .. } => {}
InstrumentParseResult::Failed {
symbol,
instrument_type,
error,
} => {
log::warn!(
"Failed to parse instrument {symbol} ({instrument_type:?}): {error}"
);
}
},
Err(e) => {
log::debug!("Skipping instrument (missing required fields): {e}");
}
}
}
instruments.rcu(|m| {
for inst in &new_instruments {
m.insert(inst.id(), inst.clone());
}
});
for (symbol, inst) in &temp_cache {
instruments_by_symbol.insert(*symbol, inst.clone());
}
for inst in new_instruments {
if let Err(e) = sender.send(DataEvent::Instrument(inst)) {
log::error!("Failed to send instrument event: {e}");
}
}
for msg in data_for_prices {
for d in parse_instrument_msg(&msg, &temp_cache, ts_init) {
Self::send_data(sender, d);
}
}
}
BitmexAction::Update => {
for msg in &data {
if let Some(state_str) = &msg.state
&& let Ok(state) = serde_json::from_str::<BitmexInstrumentState>(&format!(
"\"{state_str}\""
))
{
let instrument_id = parse_instrument_id(msg.symbol);
let action = MarketStatusAction::from(&state);
let is_trading = Some(state == BitmexInstrumentState::Open);
let ts_event = parse_optional_datetime_to_unix_nanos(
&Some(msg.timestamp),
"timestamp",
);
let status = InstrumentStatus::new(
instrument_id,
action,
ts_event,
ts_init,
None,
None,
is_trading,
None,
None,
);
if let Err(e) = sender.send(DataEvent::InstrumentStatus(status)) {
log::error!("Failed to send instrument status: {e}");
}
}
}
for msg in data {
for d in parse_instrument_msg(&msg, instruments_by_symbol, ts_init) {
Self::send_data(sender, d);
}
}
}
BitmexAction::Delete => {
log::info!(
"Received instrument delete action for {} instrument(s)",
data.len(),
);
}
}
}
async fn bootstrap_instruments(&self) -> anyhow::Result<Vec<InstrumentAny>> {
let http = self.http_client.clone();
let mut instruments = http
.request_instruments(self.config.active_only)
.await
.context("failed to request BitMEX instruments")?;
instruments.sort_by_key(|instrument| instrument.id());
self.instruments.rcu(|m| {
m.clear();
for instrument in &instruments {
m.insert(instrument.id(), instrument.clone());
}
});
self.http_client.cache_instruments(&instruments);
if let Some(ws) = &self.ws_client {
ws.cache_instruments(&instruments);
}
for instrument in &instruments {
if let Err(e) = self
.data_sender
.send(DataEvent::Instrument(instrument.clone()))
{
log::warn!(
"Failed to send instrument event for {}: {e}",
instrument.id()
);
}
}
Ok(instruments)
}
fn is_connected(&self) -> bool {
self.is_connected.load(Ordering::Relaxed)
}
fn is_disconnected(&self) -> bool {
!self.is_connected()
}
fn maybe_spawn_instrument_refresh(&mut self) {
let Some(minutes) = self.config.update_instruments_interval_mins else {
return;
};
if minutes == 0 || self.instrument_refresh_active {
return;
}
let interval_secs = minutes.saturating_mul(60);
if interval_secs == 0 {
return;
}
let interval = Duration::from_secs(interval_secs);
let cancellation = self.cancellation_token.clone();
let instruments_cache = Arc::clone(&self.instruments);
let active_only = self.config.active_only;
let client_id = self.client_id;
let http_client = self.http_client.clone();
let handle = get_runtime().spawn(async move {
let http_client = http_client;
loop {
let sleep = tokio::time::sleep(interval);
tokio::pin!(sleep);
tokio::select! {
() = cancellation.cancelled() => {
log::debug!("BitMEX instrument refresh task cancelled");
break;
}
() = &mut sleep => {
match http_client.request_instruments(active_only).await {
Ok(mut instruments) => {
instruments.sort_by_key(|instrument| instrument.id());
instruments_cache.rcu(|m| {
m.clear();
for instrument in &instruments {
m.insert(instrument.id(), instrument.clone());
}
});
http_client.cache_instruments(&instruments);
log::debug!("BitMEX instruments refreshed: client_id={client_id}");
}
Err(e) => {
log::warn!("Failed to refresh BitMEX instruments: client_id={client_id}, error={e:?}");
}
}
}
}
}
});
self.tasks.push(handle);
self.instrument_refresh_active = true;
}
}
#[async_trait::async_trait(?Send)]
impl DataClient for BitmexDataClient {
fn client_id(&self) -> ClientId {
self.client_id
}
fn venue(&self) -> Option<Venue> {
Some(self.venue())
}
fn start(&mut self) -> anyhow::Result<()> {
log::info!(
"Starting BitMEX data client: client_id={}, use_testnet={}, http_proxy_url={:?}, ws_proxy_url={:?}",
self.client_id,
self.config.use_testnet,
self.config.http_proxy_url,
self.config.ws_proxy_url,
);
Ok(())
}
fn stop(&mut self) -> anyhow::Result<()> {
log::info!("Stopping BitMEX data client {id}", id = self.client_id);
self.cancellation_token.cancel();
self.is_connected.store(false, Ordering::Relaxed);
self.instrument_refresh_active = false;
Ok(())
}
fn reset(&mut self) -> anyhow::Result<()> {
log::debug!("Resetting BitMEX data client {id}", id = self.client_id);
self.is_connected.store(false, Ordering::Relaxed);
self.cancellation_token = CancellationToken::new();
self.tasks.clear();
self.book_channels.store(AHashMap::new());
self.instrument_refresh_active = false;
Ok(())
}
fn dispose(&mut self) -> anyhow::Result<()> {
self.stop()
}
async fn connect(&mut self) -> anyhow::Result<()> {
if self.is_connected() {
return Ok(());
}
if self.ws_client.is_none() {
let ws = BitmexWebSocketClient::new_with_env(
Some(self.config.ws_url()),
self.config.api_key.clone(),
self.config.api_secret.clone(),
None,
self.config.heartbeat_interval_secs.unwrap_or(5),
self.config.use_testnet,
)
.context("failed to construct BitMEX websocket client")?;
self.ws_client = Some(ws);
}
self.bootstrap_instruments().await?;
let ws = self.ws_client_mut()?;
ws.connect()
.await
.context("failed to connect BitMEX websocket")?;
ws.wait_until_active(10.0)
.await
.context("BitMEX websocket did not become active")?;
let stream = ws.stream();
self.spawn_stream_task(stream);
self.maybe_spawn_instrument_refresh();
self.is_connected.store(true, Ordering::Relaxed);
log::info!("Connected");
Ok(())
}
async fn disconnect(&mut self) -> anyhow::Result<()> {
if self.is_disconnected() {
return Ok(());
}
self.cancellation_token.cancel();
if let Some(ws) = self.ws_client.as_mut()
&& let Err(e) = ws.close().await
{
log::warn!("Error while closing BitMEX websocket: {e:?}");
}
for handle in self.tasks.drain(..) {
if let Err(e) = handle.await {
log::error!("Error joining websocket task: {e:?}");
}
}
self.cancellation_token = CancellationToken::new();
self.is_connected.store(false, Ordering::Relaxed);
self.book_channels.store(AHashMap::new());
self.instrument_refresh_active = false;
log::info!("Disconnected");
Ok(())
}
fn is_connected(&self) -> bool {
self.is_connected()
}
fn is_disconnected(&self) -> bool {
self.is_disconnected()
}
fn subscribe_instruments(&mut self, _cmd: &SubscribeInstruments) -> anyhow::Result<()> {
let ws = self.ws_client()?.clone();
self.spawn_ws(
async move {
ws.subscribe_instruments()
.await
.map_err(|e| anyhow::anyhow!(e))
},
"BitMEX instruments subscription",
);
Ok(())
}
fn subscribe_instrument(&mut self, cmd: &SubscribeInstrument) -> anyhow::Result<()> {
let instrument_id = cmd.instrument_id;
if let Some(instrument) = self.instruments.load().get(&instrument_id).cloned() {
if let Err(e) = self.data_sender.send(DataEvent::Instrument(instrument)) {
log::error!("Failed to send instrument event for {instrument_id}: {e}");
}
return Ok(());
}
log::warn!("Instrument {instrument_id} not found in BitMEX cache");
let ws = self.ws_client()?.clone();
self.spawn_ws(
async move {
ws.subscribe_instrument(instrument_id)
.await
.map_err(|e| anyhow::anyhow!(e))
},
"BitMEX instrument subscription",
);
Ok(())
}
fn subscribe_book_deltas(&mut self, cmd: &SubscribeBookDeltas) -> anyhow::Result<()> {
if cmd.book_type != BookType::L2_MBP {
anyhow::bail!("BitMEX only supports L2_MBP order book deltas");
}
let instrument_id = cmd.instrument_id;
let depth = cmd.depth.map_or(0, |d| d.get());
let channel = if depth > 0 && depth <= 25 {
if depth != 25 {
log::info!(
"BitMEX only supports depth 25 for L2 deltas, using L2_25 for requested depth {depth}"
);
}
BitmexBookChannel::OrderBookL2_25
} else {
BitmexBookChannel::OrderBookL2
};
let ws = self.ws_client()?.clone();
let book_channels = Arc::clone(&self.book_channels);
self.spawn_ws(
async move {
match channel {
BitmexBookChannel::OrderBookL2 => ws
.subscribe_book(instrument_id)
.await
.map_err(|e| anyhow::anyhow!(e))?,
BitmexBookChannel::OrderBookL2_25 => ws
.subscribe_book_25(instrument_id)
.await
.map_err(|e| anyhow::anyhow!(e))?,
BitmexBookChannel::OrderBook10 => unreachable!(),
}
book_channels.insert(instrument_id, channel);
Ok(())
},
"BitMEX book delta subscription",
);
Ok(())
}
fn subscribe_book_depth10(&mut self, cmd: &SubscribeBookDepth10) -> anyhow::Result<()> {
let instrument_id = cmd.instrument_id;
let ws = self.ws_client()?.clone();
let book_channels = Arc::clone(&self.book_channels);
self.spawn_ws(
async move {
ws.subscribe_book_depth10(instrument_id)
.await
.map_err(|e| anyhow::anyhow!(e))?;
book_channels.insert(instrument_id, BitmexBookChannel::OrderBook10);
Ok(())
},
"BitMEX book depth10 subscription",
);
Ok(())
}
fn subscribe_quotes(&mut self, cmd: &SubscribeQuotes) -> anyhow::Result<()> {
let instrument_id = cmd.instrument_id;
let ws = self.ws_client()?.clone();
self.spawn_ws(
async move {
ws.subscribe_quotes(instrument_id)
.await
.map_err(|e| anyhow::anyhow!(e))
},
"BitMEX quote subscription",
);
Ok(())
}
fn subscribe_trades(&mut self, cmd: &SubscribeTrades) -> anyhow::Result<()> {
let instrument_id = cmd.instrument_id;
let ws = self.ws_client()?.clone();
self.spawn_ws(
async move {
ws.subscribe_trades(instrument_id)
.await
.map_err(|e| anyhow::anyhow!(e))
},
"BitMEX trade subscription",
);
Ok(())
}
fn subscribe_mark_prices(&mut self, cmd: &SubscribeMarkPrices) -> anyhow::Result<()> {
let instrument_id = cmd.instrument_id;
let ws = self.ws_client()?.clone();
self.spawn_ws(
async move {
ws.subscribe_mark_prices(instrument_id)
.await
.map_err(|e| anyhow::anyhow!(e))
},
"BitMEX mark price subscription",
);
Ok(())
}
fn subscribe_index_prices(&mut self, cmd: &SubscribeIndexPrices) -> anyhow::Result<()> {
let instrument_id = cmd.instrument_id;
let ws = self.ws_client()?.clone();
self.spawn_ws(
async move {
ws.subscribe_index_prices(instrument_id)
.await
.map_err(|e| anyhow::anyhow!(e))
},
"BitMEX index price subscription",
);
Ok(())
}
fn subscribe_funding_rates(&mut self, cmd: &SubscribeFundingRates) -> anyhow::Result<()> {
let instrument_id = cmd.instrument_id;
let ws = self.ws_client()?.clone();
self.spawn_ws(
async move {
ws.subscribe_funding_rates(instrument_id)
.await
.map_err(|e| anyhow::anyhow!(e))
},
"BitMEX funding rate subscription",
);
Ok(())
}
fn subscribe_bars(&mut self, cmd: &SubscribeBars) -> anyhow::Result<()> {
let bar_type = cmd.bar_type;
let ws = self.ws_client()?.clone();
self.spawn_ws(
async move {
ws.subscribe_bars(bar_type)
.await
.map_err(|e| anyhow::anyhow!(e))
},
"BitMEX bar subscription",
);
Ok(())
}
fn subscribe_instrument_status(
&mut self,
cmd: &SubscribeInstrumentStatus,
) -> anyhow::Result<()> {
let instrument_id = cmd.instrument_id;
let ws = self.ws_client()?.clone();
self.spawn_ws(
async move {
ws.subscribe_instrument(instrument_id)
.await
.map_err(|e| anyhow::anyhow!(e))
},
"BitMEX instrument status subscription",
);
Ok(())
}
fn unsubscribe_instrument_status(
&mut self,
cmd: &UnsubscribeInstrumentStatus,
) -> anyhow::Result<()> {
let instrument_id = cmd.instrument_id;
let ws = self.ws_client()?.clone();
self.spawn_ws(
async move {
ws.unsubscribe_instrument(instrument_id)
.await
.map_err(|e| anyhow::anyhow!(e))
},
"BitMEX instrument status unsubscribe",
);
Ok(())
}
fn unsubscribe_book_deltas(&mut self, cmd: &UnsubscribeBookDeltas) -> anyhow::Result<()> {
let instrument_id = cmd.instrument_id;
let ws = self.ws_client()?.clone();
let book_channels = Arc::clone(&self.book_channels);
self.spawn_ws(
async move {
let channel = book_channels.load().get(&instrument_id).copied();
book_channels.remove(&instrument_id);
match channel {
Some(BitmexBookChannel::OrderBookL2) => ws
.unsubscribe_book(instrument_id)
.await
.map_err(|e| anyhow::anyhow!(e))?,
Some(BitmexBookChannel::OrderBookL2_25) => ws
.unsubscribe_book_25(instrument_id)
.await
.map_err(|e| anyhow::anyhow!(e))?,
Some(BitmexBookChannel::OrderBook10) => ws
.unsubscribe_book_depth10(instrument_id)
.await
.map_err(|e| anyhow::anyhow!(e))?,
None => ws
.unsubscribe_book(instrument_id)
.await
.map_err(|e| anyhow::anyhow!(e))?,
}
Ok(())
},
"BitMEX book delta unsubscribe",
);
Ok(())
}
fn unsubscribe_book_depth10(&mut self, cmd: &UnsubscribeBookDepth10) -> anyhow::Result<()> {
let instrument_id = cmd.instrument_id;
let ws = self.ws_client()?.clone();
let book_channels = Arc::clone(&self.book_channels);
self.spawn_ws(
async move {
book_channels.remove(&instrument_id);
ws.unsubscribe_book_depth10(instrument_id)
.await
.map_err(|e| anyhow::anyhow!(e))
},
"BitMEX book depth10 unsubscribe",
);
Ok(())
}
fn unsubscribe_quotes(&mut self, cmd: &UnsubscribeQuotes) -> anyhow::Result<()> {
let instrument_id = cmd.instrument_id;
let ws = self.ws_client()?.clone();
self.spawn_ws(
async move {
ws.unsubscribe_quotes(instrument_id)
.await
.map_err(|e| anyhow::anyhow!(e))
},
"BitMEX quote unsubscribe",
);
Ok(())
}
fn unsubscribe_trades(&mut self, cmd: &UnsubscribeTrades) -> anyhow::Result<()> {
let instrument_id = cmd.instrument_id;
let ws = self.ws_client()?.clone();
self.spawn_ws(
async move {
ws.unsubscribe_trades(instrument_id)
.await
.map_err(|e| anyhow::anyhow!(e))
},
"BitMEX trade unsubscribe",
);
Ok(())
}
fn unsubscribe_mark_prices(&mut self, cmd: &UnsubscribeMarkPrices) -> anyhow::Result<()> {
let ws = self.ws_client()?.clone();
let instrument_id = cmd.instrument_id;
self.spawn_ws(
async move {
ws.unsubscribe_mark_prices(instrument_id)
.await
.map_err(|e| anyhow::anyhow!(e))
},
"BitMEX mark price unsubscribe",
);
Ok(())
}
fn unsubscribe_index_prices(&mut self, cmd: &UnsubscribeIndexPrices) -> anyhow::Result<()> {
let ws = self.ws_client()?.clone();
let instrument_id = cmd.instrument_id;
self.spawn_ws(
async move {
ws.unsubscribe_index_prices(instrument_id)
.await
.map_err(|e| anyhow::anyhow!(e))
},
"BitMEX index price unsubscribe",
);
Ok(())
}
fn unsubscribe_funding_rates(&mut self, cmd: &UnsubscribeFundingRates) -> anyhow::Result<()> {
let ws = self.ws_client()?.clone();
let instrument_id = cmd.instrument_id;
self.spawn_ws(
async move {
ws.unsubscribe_funding_rates(instrument_id)
.await
.map_err(|e| anyhow::anyhow!(e))
},
"BitMEX funding rate unsubscribe",
);
Ok(())
}
fn unsubscribe_bars(&mut self, cmd: &UnsubscribeBars) -> anyhow::Result<()> {
let bar_type = cmd.bar_type;
let ws = self.ws_client()?.clone();
self.spawn_ws(
async move {
ws.unsubscribe_bars(bar_type)
.await
.map_err(|e| anyhow::anyhow!(e))
},
"BitMEX bar unsubscribe",
);
Ok(())
}
fn request_instruments(&self, request: RequestInstruments) -> anyhow::Result<()> {
if let Some(req_venue) = request.venue
&& req_venue != self.venue()
{
log::warn!("Ignoring mismatched venue in instruments request: {req_venue}");
}
let venue = self.venue();
let http = self.http_client.clone();
let instruments_cache = Arc::clone(&self.instruments);
let sender = self.data_sender.clone();
let request_id = request.request_id;
let client_id = request.client_id.unwrap_or(self.client_id);
let params = request.params;
let start_nanos = datetime_to_unix_nanos(request.start);
let end_nanos = datetime_to_unix_nanos(request.end);
let clock = self.clock;
let active_only = self.config.active_only;
get_runtime().spawn(async move {
let http_client = http;
match http_client
.request_instruments(active_only)
.await
.context("failed to request instruments from BitMEX")
{
Ok(instruments) => {
instruments_cache.rcu(|m| {
m.clear();
for instrument in &instruments {
m.insert(instrument.id(), instrument.clone());
}
});
http_client.cache_instruments(&instruments);
let response = DataResponse::Instruments(InstrumentsResponse::new(
request_id,
client_id,
venue,
instruments,
start_nanos,
end_nanos,
clock.get_time_ns(),
params,
));
if let Err(e) = sender.send(DataEvent::Response(response)) {
log::error!("Failed to send instruments response: {e}");
}
}
Err(e) => log::error!("Instrument request failed: {e:?}"),
}
});
Ok(())
}
fn request_instrument(&self, request: RequestInstrument) -> anyhow::Result<()> {
if let Some(instrument) = self.instruments.load().get(&request.instrument_id).cloned() {
let response = DataResponse::Instrument(Box::new(InstrumentResponse::new(
request.request_id,
request.client_id.unwrap_or(self.client_id),
instrument.id(),
instrument,
datetime_to_unix_nanos(request.start),
datetime_to_unix_nanos(request.end),
self.clock.get_time_ns(),
request.params,
)));
if let Err(e) = self.data_sender.send(DataEvent::Response(response)) {
log::error!("Failed to send instrument response: {e}");
}
return Ok(());
}
let http_client = self.http_client.clone();
let instruments_cache = Arc::clone(&self.instruments);
let sender = self.data_sender.clone();
let instrument_id = request.instrument_id;
let request_id = request.request_id;
let client_id = request.client_id.unwrap_or(self.client_id);
let start = request.start;
let end = request.end;
let params = request.params;
let clock = self.clock;
get_runtime().spawn(async move {
match http_client
.request_instrument(instrument_id)
.await
.context("failed to request instrument from BitMEX")
{
Ok(Some(instrument)) => {
http_client.cache_instrument(instrument.clone());
instruments_cache.insert(instrument.id(), instrument.clone());
let response = DataResponse::Instrument(Box::new(InstrumentResponse::new(
request_id,
client_id,
instrument.id(),
instrument,
datetime_to_unix_nanos(start),
datetime_to_unix_nanos(end),
clock.get_time_ns(),
params,
)));
if let Err(e) = sender.send(DataEvent::Response(response)) {
log::error!("Failed to send instrument response: {e}");
}
}
Ok(None) => log::warn!("BitMEX instrument {instrument_id} not found"),
Err(e) => log::error!("Instrument request failed: {e:?}"),
}
});
Ok(())
}
fn request_trades(&self, request: RequestTrades) -> anyhow::Result<()> {
let http = self.http_client.clone();
let sender = self.data_sender.clone();
let instrument_id = request.instrument_id;
let start = request.start;
let end = request.end;
let limit = request.limit.map(|n| n.get() as u32);
let request_id = request.request_id;
let client_id = request.client_id.unwrap_or(self.client_id);
let params = request.params;
let clock = self.clock;
let start_nanos = datetime_to_unix_nanos(start);
let end_nanos = datetime_to_unix_nanos(end);
get_runtime().spawn(async move {
match http
.request_trades(instrument_id, start, end, limit)
.await
.context("failed to request trades from BitMEX")
{
Ok(trades) => {
let response = DataResponse::Trades(TradesResponse::new(
request_id,
client_id,
instrument_id,
trades,
start_nanos,
end_nanos,
clock.get_time_ns(),
params,
));
if let Err(e) = sender.send(DataEvent::Response(response)) {
log::error!("Failed to send trades response: {e}");
}
}
Err(e) => log::error!("Trade request failed: {e:?}"),
}
});
Ok(())
}
fn request_bars(&self, request: RequestBars) -> anyhow::Result<()> {
let http = self.http_client.clone();
let sender = self.data_sender.clone();
let bar_type = request.bar_type;
let start = request.start;
let end = request.end;
let limit = request.limit.map(|n| n.get() as u32);
let request_id = request.request_id;
let client_id = request.client_id.unwrap_or(self.client_id);
let params = request.params;
let clock = self.clock;
let start_nanos = datetime_to_unix_nanos(start);
let end_nanos = datetime_to_unix_nanos(end);
get_runtime().spawn(async move {
match http
.request_bars(bar_type, start, end, limit, false)
.await
.context("failed to request bars from BitMEX")
{
Ok(bars) => {
let response = DataResponse::Bars(BarsResponse::new(
request_id,
client_id,
bar_type,
bars,
start_nanos,
end_nanos,
clock.get_time_ns(),
params,
));
if let Err(e) = sender.send(DataEvent::Response(response)) {
log::error!("Failed to send bars response: {e}");
}
}
Err(e) => log::error!("Bar request failed: {e:?}"),
}
});
Ok(())
}
}
fn handle_quote_messages(
data: Vec<BitmexQuoteMsg>,
instruments_by_symbol: &AHashMap<Ustr, InstrumentAny>,
quote_cache: &mut QuoteCache,
ts_init: UnixNanos,
sender: &tokio::sync::mpsc::UnboundedSender<DataEvent>,
) {
for msg in data {
let Some(instrument) = instruments_by_symbol.get(&msg.symbol) else {
log::error!(
"Instrument cache miss: quote dropped for symbol={}",
msg.symbol,
);
continue;
};
let instrument_id = instrument.id();
let price_precision = instrument.price_precision();
let bid_price = msg.bid_price.map(|p| Price::new(p, price_precision));
let ask_price = msg.ask_price.map(|p| Price::new(p, price_precision));
let bid_size = msg
.bid_size
.map(|s| parse_contracts_quantity(s, instrument));
let ask_size = msg
.ask_size
.map(|s| parse_contracts_quantity(s, instrument));
let ts_event = UnixNanos::from(msg.timestamp);
match quote_cache.process(
instrument_id,
bid_price,
ask_price,
bid_size,
ask_size,
ts_event,
ts_init,
) {
Ok(quote) => {
if let Err(e) = sender.send(DataEvent::Data(Data::Quote(quote))) {
log::error!("Failed to emit data event: {e}");
}
}
Err(e) => {
log::warn!("Failed to process quote for {}: {e}", msg.symbol);
}
}
}
}