use std::{
collections::HashMap,
num::NonZeroU32,
sync::{
Arc, LazyLock, RwLock,
atomic::{AtomicBool, Ordering},
},
};
use chrono::{DateTime, Utc};
use dashmap::DashMap;
use nautilus_core::{
AtomicMap, AtomicTime, UUID4, UnixNanos,
consts::{NAUTILUS_TRADER, NAUTILUS_USER_AGENT},
env::get_or_env_var_opt,
time::get_atomic_clock_realtime,
};
use nautilus_model::{
data::{Bar, BarType, TradeTick},
enums::{
AccountType, AggregationSource, BarAggregation, ContingencyType, OrderSide, OrderType,
PriceType, TimeInForce, TrailingOffsetType, TriggerType,
},
events::AccountState,
identifiers::{AccountId, ClientOrderId, InstrumentId, OrderListId, Symbol, VenueOrderId},
instruments::{Instrument as InstrumentTrait, InstrumentAny},
reports::{FillReport, OrderStatusReport, PositionStatusReport},
types::{MarginBalance, Money, Price, Quantity},
};
use nautilus_network::{
http::{HttpClient, Method, StatusCode, USER_AGENT},
ratelimiter::quota::Quota,
retry::{RetryConfig, RetryManager},
};
use rust_decimal::Decimal;
use serde::{Deserialize, Serialize, de::DeserializeOwned};
use serde_json::Value;
use tokio_util::sync::CancellationToken;
use ustr::Ustr;
use super::{
error::{BitmexErrorResponse, BitmexHttpError},
models::{
BitmexApiInfo, BitmexExecution, BitmexInstrument, BitmexMargin, BitmexOrder,
BitmexPosition, BitmexTrade, BitmexTradeBin, BitmexWallet,
},
query::{
DeleteAllOrdersParams, DeleteOrderParams, GetExecutionParams, GetExecutionParamsBuilder,
GetOrderParams, GetPositionParams, GetPositionParamsBuilder, GetTradeBucketedParams,
GetTradeBucketedParamsBuilder, GetTradeParams, GetTradeParamsBuilder,
PostCancelAllAfterParams, PostOrderParams, PostPositionLeverageParams, PutOrderParams,
},
};
use crate::{
common::{
consts::{BITMEX_HTTP_TESTNET_URL, BITMEX_HTTP_URL, BITMEX_VENUE},
credential::{Credential, credential_env_vars},
enums::{
BitmexContingencyType, BitmexExecInstruction, BitmexOrderStatus, BitmexOrderType,
BitmexPegPriceType, BitmexSide, BitmexTimeInForce,
},
parse::{
bitmex_currency_divisor, map_bitmex_currency, parse_account_balance, quantity_to_u32,
},
},
http::{
parse::{
InstrumentParseResult, parse_fill_report, parse_instrument_any,
parse_order_status_report, parse_position_report, parse_trade, parse_trade_bin,
},
query::{DeleteAllOrdersParamsBuilder, GetOrderParamsBuilder, PutOrderParamsBuilder},
},
websocket::messages::BitmexMarginMsg,
};
const BITMEX_DEFAULT_RATE_LIMIT_PER_SECOND: u32 = 10;
const BITMEX_DEFAULT_RATE_LIMIT_PER_MINUTE_AUTHENTICATED: u32 = 120;
const BITMEX_DEFAULT_RATE_LIMIT_PER_MINUTE_UNAUTHENTICATED: u32 = 30;
const BITMEX_GLOBAL_RATE_KEY: &str = "bitmex:global";
const BITMEX_MINUTE_RATE_KEY: &str = "bitmex:minute";
static RATE_LIMIT_KEYS: LazyLock<Vec<Ustr>> = LazyLock::new(|| {
vec![
Ustr::from(BITMEX_GLOBAL_RATE_KEY),
Ustr::from(BITMEX_MINUTE_RATE_KEY),
]
});
#[derive(Debug, Serialize, Deserialize)]
pub struct BitmexResponse<T> {
pub data: Vec<T>,
}
#[derive(Debug, Clone)]
pub struct BitmexRawHttpClient {
base_url: String,
client: HttpClient,
credential: Option<Credential>,
recv_window_ms: u64,
retry_manager: RetryManager<BitmexHttpError>,
cancellation_token: Arc<RwLock<CancellationToken>>,
}
impl Default for BitmexRawHttpClient {
fn default() -> Self {
Self::new(
None,
60,
3,
1000,
10_000,
10_000,
BITMEX_DEFAULT_RATE_LIMIT_PER_SECOND,
BITMEX_DEFAULT_RATE_LIMIT_PER_MINUTE_UNAUTHENTICATED,
None,
)
.expect("Failed to create default BitmexHttpInnerClient")
}
}
impl BitmexRawHttpClient {
#[allow(clippy::too_many_arguments)]
pub fn new(
base_url: Option<String>,
timeout_secs: u64,
max_retries: u32,
retry_delay_ms: u64,
retry_delay_max_ms: u64,
recv_window_ms: u64,
max_requests_per_second: u32,
max_requests_per_minute: u32,
proxy_url: Option<String>,
) -> Result<Self, BitmexHttpError> {
let retry_config = RetryConfig {
max_retries,
initial_delay_ms: retry_delay_ms,
max_delay_ms: retry_delay_max_ms,
backoff_factor: 2.0,
jitter_ms: 1000,
operation_timeout_ms: Some(60_000),
immediate_first: false,
max_elapsed_ms: Some(180_000),
};
let retry_manager = RetryManager::new(retry_config);
Ok(Self {
base_url: base_url.unwrap_or(BITMEX_HTTP_URL.to_string()),
client: HttpClient::new(
Self::default_headers(),
vec![],
Self::rate_limiter_quotas(max_requests_per_second, max_requests_per_minute)?,
Some(Self::default_quota(max_requests_per_second)?),
Some(timeout_secs),
proxy_url,
)
.map_err(|e| {
BitmexHttpError::NetworkError(format!("Failed to create HTTP client: {e}"))
})?,
credential: None,
recv_window_ms,
retry_manager,
cancellation_token: Arc::new(RwLock::new(CancellationToken::new())),
})
}
#[allow(clippy::too_many_arguments)]
pub fn with_credentials(
api_key: String,
api_secret: String,
base_url: String,
timeout_secs: u64,
max_retries: u32,
retry_delay_ms: u64,
retry_delay_max_ms: u64,
recv_window_ms: u64,
max_requests_per_second: u32,
max_requests_per_minute: u32,
proxy_url: Option<String>,
) -> Result<Self, BitmexHttpError> {
let retry_config = RetryConfig {
max_retries,
initial_delay_ms: retry_delay_ms,
max_delay_ms: retry_delay_max_ms,
backoff_factor: 2.0,
jitter_ms: 1000,
operation_timeout_ms: Some(60_000),
immediate_first: false,
max_elapsed_ms: Some(180_000),
};
let retry_manager = RetryManager::new(retry_config);
Ok(Self {
base_url,
client: HttpClient::new(
Self::default_headers(),
vec![],
Self::rate_limiter_quotas(max_requests_per_second, max_requests_per_minute)?,
Some(Self::default_quota(max_requests_per_second)?),
Some(timeout_secs),
proxy_url,
)
.map_err(|e| {
BitmexHttpError::NetworkError(format!("Failed to create HTTP client: {e}"))
})?,
credential: Some(Credential::new(api_key, api_secret)),
recv_window_ms,
retry_manager,
cancellation_token: Arc::new(RwLock::new(CancellationToken::new())),
})
}
fn default_headers() -> HashMap<String, String> {
HashMap::from([(USER_AGENT.to_string(), NAUTILUS_USER_AGENT.to_string())])
}
fn default_quota(max_requests_per_second: u32) -> Result<Quota, BitmexHttpError> {
let burst = NonZeroU32::new(max_requests_per_second)
.unwrap_or(NonZeroU32::new(BITMEX_DEFAULT_RATE_LIMIT_PER_SECOND).expect("non-zero"));
Quota::per_second(burst).ok_or_else(|| {
BitmexHttpError::ValidationError(format!(
"Invalid max_requests_per_second: {max_requests_per_second} exceeds maximum"
))
})
}
fn rate_limiter_quotas(
max_requests_per_second: u32,
max_requests_per_minute: u32,
) -> Result<Vec<(String, Quota)>, BitmexHttpError> {
let per_sec_quota = Self::default_quota(max_requests_per_second)?;
let per_min_quota =
Quota::per_minute(NonZeroU32::new(max_requests_per_minute).unwrap_or_else(|| {
NonZeroU32::new(BITMEX_DEFAULT_RATE_LIMIT_PER_MINUTE_AUTHENTICATED)
.expect("non-zero")
}));
Ok(vec![
(BITMEX_GLOBAL_RATE_KEY.to_string(), per_sec_quota),
(BITMEX_MINUTE_RATE_KEY.to_string(), per_min_quota),
])
}
fn rate_limit_keys() -> Vec<Ustr> {
RATE_LIMIT_KEYS.clone()
}
pub fn cancel_all_requests(&self) {
self.cancellation_token
.read()
.expect("cancellation token lock poisoned")
.cancel();
}
pub fn reset_cancellation_token(&self) {
*self
.cancellation_token
.write()
.expect("cancellation token lock poisoned") = CancellationToken::new();
}
pub fn cancellation_token(&self) -> CancellationToken {
self.cancellation_token
.read()
.expect("cancellation token lock poisoned")
.clone()
}
fn sign_request(
&self,
method: &Method,
endpoint: &str,
body: Option<&[u8]>,
) -> Result<HashMap<String, String>, BitmexHttpError> {
let credential = self
.credential
.as_ref()
.ok_or(BitmexHttpError::MissingCredentials)?;
let expires = Utc::now().timestamp() + (self.recv_window_ms / 1000) as i64;
let body_str = body.and_then(|b| std::str::from_utf8(b).ok()).unwrap_or("");
let full_path = if endpoint.starts_with("/api/v1") {
endpoint.to_string()
} else {
format!("/api/v1{endpoint}")
};
let signature = credential.sign(method.as_str(), &full_path, expires, body_str);
let mut headers = HashMap::new();
headers.insert("api-expires".to_string(), expires.to_string());
headers.insert("api-key".to_string(), credential.api_key().to_string());
headers.insert("api-signature".to_string(), signature);
if body.is_some()
&& (*method == Method::POST || *method == Method::PUT || *method == Method::DELETE)
{
headers.insert(
"Content-Type".to_string(),
"application/x-www-form-urlencoded".to_string(),
);
}
Ok(headers)
}
async fn send_request<T: DeserializeOwned, P: Serialize>(
&self,
method: Method,
endpoint: &str,
params: Option<&P>,
body: Option<Vec<u8>>,
authenticate: bool,
) -> Result<T, BitmexHttpError> {
let endpoint = endpoint.to_string();
let method_clone = method.clone();
let body_clone = body.clone();
let params_str = if method == Method::GET || method == Method::DELETE {
params
.map(serde_urlencoded::to_string)
.transpose()
.map_err(|e| {
BitmexHttpError::JsonError(format!("Failed to serialize params: {e}"))
})?
} else {
None
};
let full_endpoint = match params_str {
Some(ref query) if !query.is_empty() => format!("{endpoint}?{query}"),
_ => endpoint.clone(),
};
let url = format!("{}{}", self.base_url, full_endpoint);
let operation = || {
let url = url.clone();
let method = method_clone.clone();
let body = body_clone.clone();
let full_endpoint = full_endpoint.clone();
async move {
let headers = if authenticate {
Some(self.sign_request(&method, &full_endpoint, body.as_deref())?)
} else {
None
};
let rate_keys = Self::rate_limit_keys();
let resp = self
.client
.request_with_ustr_keys(method, url, None, headers, body, None, Some(rate_keys))
.await?;
if resp.status.is_success() {
serde_json::from_slice(&resp.body).map_err(Into::into)
} else if let Ok(error_resp) =
serde_json::from_slice::<BitmexErrorResponse>(&resp.body)
{
Err(error_resp.into())
} else {
Err(BitmexHttpError::UnexpectedStatus {
status: StatusCode::from_u16(resp.status.as_u16())
.unwrap_or(StatusCode::INTERNAL_SERVER_ERROR),
body: String::from_utf8_lossy(&resp.body).to_string(),
})
}
}
};
let should_retry = |error: &BitmexHttpError| -> bool {
match error {
BitmexHttpError::NetworkError(_) => true,
BitmexHttpError::UnexpectedStatus { status, .. } => {
status.as_u16() >= 500 || status.as_u16() == 429
}
BitmexHttpError::BitmexError {
error_name,
message,
} => {
error_name == "RateLimitError"
|| (error_name == "HTTPError"
&& message.to_lowercase().contains("rate limit"))
}
_ => false,
}
};
let create_error = |msg: String| -> BitmexHttpError {
if msg == "canceled" {
BitmexHttpError::Canceled("Adapter disconnecting or shutting down".to_string())
} else {
BitmexHttpError::NetworkError(msg)
}
};
let cancel_token = self.cancellation_token();
self.retry_manager
.execute_with_retry_with_cancel(
endpoint.as_str(),
operation,
should_retry,
create_error,
&cancel_token,
)
.await
}
pub async fn get_instruments(
&self,
active_only: bool,
) -> Result<Vec<BitmexInstrument>, BitmexHttpError> {
let path = if active_only {
"/instrument/active"
} else {
"/instrument"
};
self.send_request::<_, ()>(Method::GET, path, None, None, false)
.await
}
pub async fn get_server_time(&self) -> Result<u64, BitmexHttpError> {
let response: BitmexApiInfo = self
.send_request::<_, ()>(Method::GET, "", None, None, false)
.await?;
Ok(response.timestamp)
}
pub async fn get_instrument(
&self,
symbol: &str,
) -> Result<Option<BitmexInstrument>, BitmexHttpError> {
let path = &format!("/instrument?symbol={symbol}");
let instruments: Vec<BitmexInstrument> = self
.send_request::<_, ()>(Method::GET, path, None, None, false)
.await?;
Ok(instruments.into_iter().next())
}
pub async fn get_wallet(&self) -> Result<BitmexWallet, BitmexHttpError> {
let endpoint = "/user/wallet";
self.send_request::<_, ()>(Method::GET, endpoint, None, None, true)
.await
}
pub async fn get_margin(&self, currency: &str) -> Result<BitmexMargin, BitmexHttpError> {
let path = format!("/user/margin?currency={currency}");
self.send_request::<_, ()>(Method::GET, &path, None, None, true)
.await
}
pub async fn get_all_margins(&self) -> Result<Vec<BitmexMargin>, BitmexHttpError> {
self.send_request::<_, ()>(Method::GET, "/user/margin?currency=all", None, None, true)
.await
}
pub async fn get_trades(
&self,
params: GetTradeParams,
) -> Result<Vec<BitmexTrade>, BitmexHttpError> {
self.send_request(Method::GET, "/trade", Some(¶ms), None, true)
.await
}
pub async fn get_trade_bucketed(
&self,
params: GetTradeBucketedParams,
) -> Result<Vec<BitmexTradeBin>, BitmexHttpError> {
self.send_request(Method::GET, "/trade/bucketed", Some(¶ms), None, true)
.await
}
pub async fn get_orders(
&self,
params: GetOrderParams,
) -> Result<Vec<BitmexOrder>, BitmexHttpError> {
self.send_request(Method::GET, "/order", Some(¶ms), None, true)
.await
}
pub async fn place_order(&self, params: PostOrderParams) -> Result<Value, BitmexHttpError> {
let body = serde_urlencoded::to_string(¶ms)
.map_err(|e| {
BitmexHttpError::ValidationError(format!("Failed to serialize parameters: {e}"))
})?
.into_bytes();
let path = "/order";
self.send_request::<_, ()>(Method::POST, path, None, Some(body), true)
.await
}
pub async fn cancel_orders(&self, params: DeleteOrderParams) -> Result<Value, BitmexHttpError> {
let body = serde_urlencoded::to_string(¶ms)
.map_err(|e| {
BitmexHttpError::ValidationError(format!("Failed to serialize parameters: {e}"))
})?
.into_bytes();
let path = "/order";
self.send_request::<_, ()>(Method::DELETE, path, None, Some(body), true)
.await
}
pub async fn amend_order(&self, params: PutOrderParams) -> Result<Value, BitmexHttpError> {
let body = serde_urlencoded::to_string(¶ms)
.map_err(|e| {
BitmexHttpError::ValidationError(format!("Failed to serialize parameters: {e}"))
})?
.into_bytes();
let path = "/order";
self.send_request::<_, ()>(Method::PUT, path, None, Some(body), true)
.await
}
pub async fn cancel_all_orders(
&self,
params: DeleteAllOrdersParams,
) -> Result<Value, BitmexHttpError> {
self.send_request(Method::DELETE, "/order/all", Some(¶ms), None, true)
.await
}
pub async fn cancel_all_after(
&self,
params: PostCancelAllAfterParams,
) -> Result<Value, BitmexHttpError> {
let body = serde_urlencoded::to_string(¶ms)
.map_err(|e| {
BitmexHttpError::ValidationError(format!("Failed to serialize parameters: {e}"))
})?
.into_bytes();
self.send_request::<_, ()>(
Method::POST,
"/order/cancelAllAfter",
None,
Some(body),
true,
)
.await
}
pub async fn get_executions(
&self,
params: GetExecutionParams,
) -> Result<Vec<BitmexExecution>, BitmexHttpError> {
let query = serde_urlencoded::to_string(¶ms).map_err(|e| {
BitmexHttpError::ValidationError(format!("Failed to serialize parameters: {e}"))
})?;
let path = format!("/execution/tradeHistory?{query}");
self.send_request::<_, ()>(Method::GET, &path, None, None, true)
.await
}
pub async fn get_positions(
&self,
params: GetPositionParams,
) -> Result<Vec<BitmexPosition>, BitmexHttpError> {
self.send_request(Method::GET, "/position", Some(¶ms), None, true)
.await
}
pub async fn update_position_leverage(
&self,
params: PostPositionLeverageParams,
) -> Result<BitmexPosition, BitmexHttpError> {
let body = serde_urlencoded::to_string(¶ms)
.map_err(|e| {
BitmexHttpError::ValidationError(format!("Failed to serialize parameters: {e}"))
})?
.into_bytes();
let path = "/position/leverage";
self.send_request::<_, ()>(Method::POST, path, None, Some(body), true)
.await
}
}
#[derive(Debug)]
#[cfg_attr(
feature = "python",
pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.bitmex", from_py_object)
)]
#[cfg_attr(
feature = "python",
pyo3_stub_gen::derive::gen_stub_pyclass(module = "nautilus_trader.bitmex")
)]
pub struct BitmexHttpClient {
pub(crate) instruments_cache: Arc<AtomicMap<Ustr, InstrumentAny>>,
pub(crate) order_type_cache: Arc<DashMap<ClientOrderId, OrderType>>,
clock: &'static AtomicTime,
inner: Arc<BitmexRawHttpClient>,
cache_initialized: AtomicBool,
}
impl Clone for BitmexHttpClient {
fn clone(&self) -> Self {
let cache_initialized = AtomicBool::new(false);
let is_initialized = self.cache_initialized.load(Ordering::Acquire);
if is_initialized {
cache_initialized.store(true, Ordering::Release);
}
Self {
inner: self.inner.clone(),
instruments_cache: self.instruments_cache.clone(),
order_type_cache: self.order_type_cache.clone(),
cache_initialized,
clock: self.clock,
}
}
}
impl Default for BitmexHttpClient {
fn default() -> Self {
Self::new(
None,
None,
None,
false,
60,
3,
1_000,
10_000,
10_000,
BITMEX_DEFAULT_RATE_LIMIT_PER_SECOND,
BITMEX_DEFAULT_RATE_LIMIT_PER_MINUTE_UNAUTHENTICATED,
None,
)
.expect("Failed to create default BitmexHttpClient")
}
}
impl BitmexHttpClient {
#[allow(clippy::too_many_arguments)]
pub fn new(
base_url: Option<String>,
api_key: Option<String>,
api_secret: Option<String>,
testnet: bool,
timeout_secs: u64,
max_retries: u32,
retry_delay_ms: u64,
retry_delay_max_ms: u64,
recv_window_ms: u64,
max_requests_per_second: u32,
max_requests_per_minute: u32,
proxy_url: Option<String>,
) -> Result<Self, BitmexHttpError> {
let url = base_url.unwrap_or_else(|| {
if testnet {
BITMEX_HTTP_TESTNET_URL.to_string()
} else {
BITMEX_HTTP_URL.to_string()
}
});
let (key_var, secret_var) = credential_env_vars(testnet);
let api_key = get_or_env_var_opt(api_key, key_var);
let api_secret = get_or_env_var_opt(api_secret, secret_var);
let inner = match (api_key, api_secret) {
(Some(key), Some(secret)) => BitmexRawHttpClient::with_credentials(
key,
secret,
url,
timeout_secs,
max_retries,
retry_delay_ms,
retry_delay_max_ms,
recv_window_ms,
max_requests_per_second,
max_requests_per_minute,
proxy_url,
)?,
(Some(_), None) | (None, Some(_)) => {
return Err(BitmexHttpError::ValidationError(
"Both api_key and api_secret must be provided, or neither".to_string(),
));
}
(None, None) => BitmexRawHttpClient::new(
Some(url),
timeout_secs,
max_retries,
retry_delay_ms,
retry_delay_max_ms,
recv_window_ms,
max_requests_per_second,
max_requests_per_minute,
proxy_url,
)?,
};
Ok(Self {
inner: Arc::new(inner),
instruments_cache: Arc::new(AtomicMap::new()),
order_type_cache: Arc::new(DashMap::new()),
cache_initialized: AtomicBool::new(false),
clock: get_atomic_clock_realtime(),
})
}
pub fn from_env() -> anyhow::Result<Self> {
Self::with_credentials(
None, None, None, 60, 3, 1_000, 10_000, 10_000, 10, 120, None,
)
.map_err(|e| anyhow::anyhow!("Failed to create HTTP client: {e}"))
}
#[allow(clippy::too_many_arguments)]
pub fn with_credentials(
api_key: Option<String>,
api_secret: Option<String>,
base_url: Option<String>,
timeout_secs: u64,
max_retries: u32,
retry_delay_ms: u64,
retry_delay_max_ms: u64,
recv_window_ms: u64,
max_requests_per_second: u32,
max_requests_per_minute: u32,
proxy_url: Option<String>,
) -> anyhow::Result<Self> {
let testnet = base_url.as_ref().is_some_and(|url| url.contains("testnet"));
let (key_var, secret_var) = credential_env_vars(testnet);
let api_key = get_or_env_var_opt(api_key, key_var);
let api_secret = get_or_env_var_opt(api_secret, secret_var);
if api_key.is_some() && api_secret.is_none() {
anyhow::bail!("{secret_var} is required when {key_var} is provided");
}
if api_key.is_none() && api_secret.is_some() {
anyhow::bail!("{key_var} is required when {secret_var} is provided");
}
Self::new(
base_url,
api_key,
api_secret,
testnet,
timeout_secs,
max_retries,
retry_delay_ms,
retry_delay_max_ms,
recv_window_ms,
max_requests_per_second,
max_requests_per_minute,
proxy_url,
)
.map_err(|e| anyhow::anyhow!("Failed to create HTTP client: {e}"))
}
#[must_use]
pub fn base_url(&self) -> &str {
self.inner.base_url.as_str()
}
#[must_use]
pub fn api_key(&self) -> Option<&str> {
self.inner.credential.as_ref().map(|c| c.api_key())
}
#[must_use]
pub fn api_key_masked(&self) -> Option<String> {
self.inner.credential.as_ref().map(|c| c.api_key_masked())
}
pub async fn get_server_time(&self) -> Result<u64, BitmexHttpError> {
self.inner.get_server_time().await
}
pub async fn cancel_all_after(&self, timeout_ms: u64) -> anyhow::Result<()> {
let params = PostCancelAllAfterParams {
timeout: timeout_ms,
};
self.inner.cancel_all_after(params).await?;
Ok(())
}
fn generate_ts_init(&self) -> UnixNanos {
self.clock.get_time_ns()
}
fn is_contingent_order(contingency_type: ContingencyType) -> bool {
matches!(
contingency_type,
ContingencyType::Oco | ContingencyType::Oto | ContingencyType::Ouo
)
}
fn is_parent_contingency(contingency_type: ContingencyType) -> bool {
matches!(
contingency_type,
ContingencyType::Oco | ContingencyType::Oto
)
}
fn populate_linked_order_ids(reports: &mut [OrderStatusReport]) {
let mut order_list_groups: HashMap<OrderListId, Vec<ClientOrderId>> = HashMap::new();
let mut order_list_parents: HashMap<OrderListId, ClientOrderId> = HashMap::new();
let mut prefix_groups: HashMap<String, Vec<ClientOrderId>> = HashMap::new();
let mut prefix_parents: HashMap<String, ClientOrderId> = HashMap::new();
for report in reports.iter() {
let Some(client_order_id) = report.client_order_id else {
continue;
};
if let Some(order_list_id) = report.order_list_id {
order_list_groups
.entry(order_list_id)
.or_default()
.push(client_order_id);
if Self::is_parent_contingency(report.contingency_type) {
order_list_parents
.entry(order_list_id)
.or_insert(client_order_id);
}
}
if let Some((base, _)) = client_order_id.as_str().rsplit_once('-')
&& Self::is_contingent_order(report.contingency_type)
{
prefix_groups
.entry(base.to_owned())
.or_default()
.push(client_order_id);
if Self::is_parent_contingency(report.contingency_type) {
prefix_parents
.entry(base.to_owned())
.or_insert(client_order_id);
}
}
}
for report in reports.iter_mut() {
let Some(client_order_id) = report.client_order_id else {
continue;
};
if report.linked_order_ids.is_some() {
continue;
}
if !Self::is_contingent_order(report.contingency_type) {
continue;
}
if let Some(order_list_id) = report.order_list_id
&& let Some(group) = order_list_groups.get(&order_list_id)
{
let mut linked: Vec<ClientOrderId> = group
.iter()
.copied()
.filter(|candidate| candidate != &client_order_id)
.collect();
if !linked.is_empty() {
if let Some(parent_id) = order_list_parents.get(&order_list_id) {
if client_order_id == *parent_id {
report.parent_order_id = None;
} else {
linked.sort_by_key(|candidate| i32::from(candidate != parent_id));
report.parent_order_id = Some(*parent_id);
}
} else {
report.parent_order_id = None;
}
log::trace!(
"BitMEX linked ids sourced from order list id: client_order_id={:?}, order_list_id={:?}, contingency_type={:?}, linked_order_ids={:?}",
client_order_id,
order_list_id,
report.contingency_type,
linked,
);
report.linked_order_ids = Some(linked);
continue;
}
log::trace!(
"BitMEX order list id group had no peers: client_order_id={:?}, order_list_id={:?}, contingency_type={:?}, order_list_group={:?}",
client_order_id,
order_list_id,
report.contingency_type,
group,
);
report.parent_order_id = None;
} else if report.order_list_id.is_none() {
report.parent_order_id = None;
}
if let Some((base, _)) = client_order_id.as_str().rsplit_once('-')
&& let Some(group) = prefix_groups.get(base)
{
let mut linked: Vec<ClientOrderId> = group
.iter()
.copied()
.filter(|candidate| candidate != &client_order_id)
.collect();
if !linked.is_empty() {
if let Some(parent_id) = prefix_parents.get(base) {
if client_order_id == *parent_id {
report.parent_order_id = None;
} else {
linked.sort_by_key(|candidate| i32::from(candidate != parent_id));
report.parent_order_id = Some(*parent_id);
}
} else {
report.parent_order_id = None;
}
log::trace!(
"BitMEX linked ids constructed from client order id prefix: client_order_id={:?}, contingency_type={:?}, base={}, linked_order_ids={:?}",
client_order_id,
report.contingency_type,
base,
linked,
);
report.linked_order_ids = Some(linked);
continue;
}
log::trace!(
"BitMEX client order id prefix group had no peers: client_order_id={:?}, contingency_type={:?}, base={}, prefix_group={:?}",
client_order_id,
report.contingency_type,
base,
group,
);
report.parent_order_id = None;
} else if client_order_id.as_str().contains('-') {
report.parent_order_id = None;
}
if Self::is_contingent_order(report.contingency_type) {
log::warn!(
"BitMEX order status report missing linked ids after grouping: client_order_id={:?}, order_list_id={:?}, contingency_type={:?}",
report.client_order_id,
report.order_list_id,
report.contingency_type,
);
report.contingency_type = ContingencyType::NoContingency;
report.parent_order_id = None;
}
report.linked_order_ids = None;
}
}
pub fn cancel_all_requests(&self) {
self.inner.cancel_all_requests();
}
pub fn reset_cancellation_token(&self) {
self.inner.reset_cancellation_token();
}
pub fn cancellation_token(&self) -> CancellationToken {
self.inner.cancellation_token()
}
pub fn cache_instrument(&self, instrument: InstrumentAny) {
self.instruments_cache
.insert(instrument.raw_symbol().inner(), instrument);
self.cache_initialized.store(true, Ordering::Release);
}
pub fn cache_instruments(&self, instruments: &[InstrumentAny]) {
self.instruments_cache.rcu(|m| {
for inst in instruments {
m.insert(inst.raw_symbol().inner(), inst.clone());
}
});
self.cache_initialized.store(true, Ordering::Release);
}
pub fn get_instrument(&self, symbol: &Ustr) -> Option<InstrumentAny> {
self.instruments_cache.get_cloned(symbol)
}
pub async fn request_instrument(
&self,
instrument_id: InstrumentId,
) -> anyhow::Result<Option<InstrumentAny>> {
let response = self
.inner
.get_instrument(instrument_id.symbol.as_str())
.await?;
let instrument = match response {
Some(instrument) => instrument,
None => return Ok(None),
};
let ts_init = self.generate_ts_init();
match parse_instrument_any(&instrument, ts_init) {
InstrumentParseResult::Ok(inst) => Ok(Some(*inst)),
InstrumentParseResult::Unsupported {
symbol,
instrument_type,
} => {
log::debug!(
"Instrument {symbol} has unsupported type {instrument_type:?}, returning None"
);
Ok(None)
}
InstrumentParseResult::Inactive { symbol, state } => {
log::debug!("Instrument {symbol} is inactive (state={state}), returning None");
Ok(None)
}
InstrumentParseResult::Failed {
symbol,
instrument_type,
error,
} => {
log::error!(
"Failed to parse instrument {symbol} (type={instrument_type:?}): {error}"
);
Ok(None)
}
}
}
pub async fn request_instruments(
&self,
active_only: bool,
) -> anyhow::Result<Vec<InstrumentAny>> {
let instruments = self.inner.get_instruments(active_only).await?;
let ts_init = self.generate_ts_init();
let mut parsed_instruments = Vec::new();
let mut skipped_count = 0;
let mut inactive_count = 0;
let mut failed_count = 0;
let total_count = instruments.len();
for inst in instruments {
match parse_instrument_any(&inst, ts_init) {
InstrumentParseResult::Ok(instrument_any) => {
parsed_instruments.push(*instrument_any);
}
InstrumentParseResult::Unsupported {
symbol,
instrument_type,
} => {
skipped_count += 1;
log::debug!(
"Skipping unsupported instrument type: symbol={symbol}, type={instrument_type:?}"
);
}
InstrumentParseResult::Inactive { symbol, state } => {
inactive_count += 1;
log::debug!("Skipping inactive instrument: symbol={symbol}, state={state}");
}
InstrumentParseResult::Failed {
symbol,
instrument_type,
error,
} => {
failed_count += 1;
log::error!(
"Failed to parse instrument: symbol={symbol}, type={instrument_type:?}, error={error}"
);
}
}
}
if skipped_count > 0 {
log::info!(
"Skipped {skipped_count} unsupported instrument type(s) out of {total_count} total"
);
}
if inactive_count > 0 {
log::info!(
"Skipped {inactive_count} inactive instrument(s) out of {total_count} total"
);
}
if failed_count > 0 {
log::error!(
"Instrument parse failures: {failed_count} failed out of {total_count} total ({} successfully parsed)",
parsed_instruments.len()
);
}
Ok(parsed_instruments)
}
pub async fn get_wallet(&self) -> Result<BitmexWallet, BitmexHttpError> {
let inner = self.inner.clone();
inner.get_wallet().await
}
pub async fn get_orders(
&self,
params: GetOrderParams,
) -> Result<Vec<BitmexOrder>, BitmexHttpError> {
let inner = self.inner.clone();
inner.get_orders(params).await
}
fn instrument_from_cache(&self, symbol: Ustr) -> anyhow::Result<InstrumentAny> {
self.get_instrument(&symbol).ok_or_else(|| {
anyhow::anyhow!(
"Instrument {symbol} not found in cache, ensure instruments loaded first"
)
})
}
pub fn get_price_precision(&self, symbol: Ustr) -> anyhow::Result<u8> {
self.instrument_from_cache(symbol)
.map(|instrument| instrument.price_precision())
}
pub async fn get_margin(&self, currency: &str) -> anyhow::Result<BitmexMargin> {
self.inner
.get_margin(currency)
.await
.map_err(|e| anyhow::anyhow!(e))
}
pub async fn get_all_margins(&self) -> anyhow::Result<Vec<BitmexMargin>> {
self.inner
.get_all_margins()
.await
.map_err(|e| anyhow::anyhow!(e))
}
pub async fn request_account_state(
&self,
account_id: AccountId,
) -> anyhow::Result<AccountState> {
let margins = self
.inner
.get_all_margins()
.await
.map_err(|e| anyhow::anyhow!(e))?;
let ts_init =
UnixNanos::from(chrono::Utc::now().timestamp_nanos_opt().unwrap_or_default() as u64);
let mut balances = Vec::with_capacity(margins.len());
let mut margins_vec = Vec::new();
let mut latest_timestamp: Option<chrono::DateTime<chrono::Utc>> = None;
for margin in margins {
if let Some(ts) = margin.timestamp {
latest_timestamp = Some(latest_timestamp.map_or(ts, |prev| prev.max(ts)));
}
let margin_msg = BitmexMarginMsg {
account: margin.account,
currency: margin.currency,
risk_limit: margin.risk_limit,
amount: margin.amount,
prev_realised_pnl: margin.prev_realised_pnl,
gross_comm: margin.gross_comm,
gross_open_cost: margin.gross_open_cost,
gross_open_premium: margin.gross_open_premium,
gross_exec_cost: margin.gross_exec_cost,
gross_mark_value: margin.gross_mark_value,
risk_value: margin.risk_value,
init_margin: margin.init_margin,
maint_margin: margin.maint_margin,
target_excess_margin: margin.target_excess_margin,
realised_pnl: margin.realised_pnl,
unrealised_pnl: margin.unrealised_pnl,
wallet_balance: margin.wallet_balance,
margin_balance: margin.margin_balance,
margin_leverage: margin.margin_leverage,
margin_used_pcnt: margin.margin_used_pcnt,
excess_margin: margin.excess_margin,
available_margin: margin.available_margin,
withdrawable_margin: margin.withdrawable_margin,
maker_fee_discount: None,
taker_fee_discount: None,
timestamp: margin.timestamp.unwrap_or_else(chrono::Utc::now),
foreign_margin_balance: None,
foreign_requirement: None,
};
let balance = parse_account_balance(&margin_msg);
let divisor = bitmex_currency_divisor(margin_msg.currency.as_str());
let initial_dec = Decimal::from(margin_msg.init_margin.unwrap_or(0).max(0)) / divisor;
let maintenance_dec =
Decimal::from(margin_msg.maint_margin.unwrap_or(0).max(0)) / divisor;
if !initial_dec.is_zero() || !maintenance_dec.is_zero() {
let currency = balance.total.currency;
let currency_str = map_bitmex_currency(margin_msg.currency.as_str());
let margin_instrument_id = InstrumentId::new(
Symbol::from_str_unchecked(format!("ACCOUNT-{currency_str}")),
*BITMEX_VENUE,
);
margins_vec.push(MarginBalance::new(
Money::from_decimal(initial_dec, currency)
.unwrap_or_else(|_| Money::zero(currency)),
Money::from_decimal(maintenance_dec, currency)
.unwrap_or_else(|_| Money::zero(currency)),
margin_instrument_id,
));
}
balances.push(balance);
}
if balances.is_empty() {
anyhow::bail!("No margin data returned from BitMEX");
}
let account_type = AccountType::Margin;
let is_reported = true;
let event_id = UUID4::new();
let ts_event = latest_timestamp.map_or(ts_init, |ts| {
UnixNanos::from(ts.timestamp_nanos_opt().unwrap_or_default() as u64)
});
Ok(AccountState::new(
account_id,
account_type,
balances,
margins_vec,
is_reported,
event_id,
ts_event,
ts_init,
None,
))
}
#[allow(clippy::too_many_arguments)]
pub async fn submit_order(
&self,
instrument_id: InstrumentId,
client_order_id: ClientOrderId,
order_side: OrderSide,
order_type: OrderType,
quantity: Quantity,
time_in_force: TimeInForce,
price: Option<Price>,
trigger_price: Option<Price>,
trigger_type: Option<TriggerType>,
trailing_offset: Option<f64>,
trailing_offset_type: Option<TrailingOffsetType>,
display_qty: Option<Quantity>,
post_only: bool,
reduce_only: bool,
order_list_id: Option<OrderListId>,
contingency_type: Option<ContingencyType>,
peg_price_type: Option<BitmexPegPriceType>,
peg_offset_value: Option<f64>,
) -> anyhow::Result<OrderStatusReport> {
let instrument = self.instrument_from_cache(instrument_id.symbol.inner())?;
let mut params = super::query::PostOrderParamsBuilder::default();
params.text(NAUTILUS_TRADER);
params.symbol(instrument_id.symbol.as_str());
params.cl_ord_id(client_order_id.as_str());
if order_side == OrderSide::NoOrderSide {
anyhow::bail!("Order side must be Buy or Sell");
}
let side = BitmexSide::from(order_side.as_specified());
params.side(side);
let ord_type = BitmexOrderType::try_from_order_type(order_type)?;
params.ord_type(ord_type);
params.order_qty(quantity_to_u32(&quantity, &instrument));
let tif = BitmexTimeInForce::try_from_time_in_force(time_in_force)?;
params.time_in_force(tif);
if let Some(price) = price {
params.price(price.as_f64());
}
if let Some(trigger_price) = trigger_price {
params.stop_px(trigger_price.as_f64());
}
if let Some(display_qty) = display_qty {
params.display_qty(quantity_to_u32(&display_qty, &instrument));
}
if let Some(order_list_id) = order_list_id {
params.cl_ord_link_id(order_list_id.as_str());
}
let is_trailing_stop = matches!(
order_type,
OrderType::TrailingStopMarket | OrderType::TrailingStopLimit
);
if is_trailing_stop && let Some(offset) = trailing_offset {
if let Some(offset_type) = trailing_offset_type
&& offset_type != TrailingOffsetType::Price
{
anyhow::bail!(
"BitMEX only supports PRICE trailing offset type, was {offset_type:?}"
);
}
params.peg_price_type(BitmexPegPriceType::TrailingStopPeg);
let signed_offset = match order_side {
OrderSide::Sell => -offset.abs(),
OrderSide::Buy => offset.abs(),
_ => offset,
};
params.peg_offset_value(signed_offset);
}
if peg_price_type.is_none() && peg_offset_value.is_some() {
anyhow::bail!("`peg_offset_value` requires `peg_price_type`");
}
if let Some(peg_type) = peg_price_type {
if order_type != OrderType::Limit {
anyhow::bail!(
"Pegged orders only supported for LIMIT order type, was {order_type:?}"
);
}
params.ord_type(BitmexOrderType::Pegged);
params.peg_price_type(peg_type);
if let Some(offset) = peg_offset_value {
params.peg_offset_value(offset);
}
}
let mut exec_inst = Vec::new();
if post_only {
exec_inst.push(BitmexExecInstruction::ParticipateDoNotInitiate);
}
if reduce_only {
exec_inst.push(BitmexExecInstruction::ReduceOnly);
}
if (trigger_price.is_some() || is_trailing_stop)
&& let Some(trigger_type) = trigger_type
{
match trigger_type {
TriggerType::LastPrice => exec_inst.push(BitmexExecInstruction::LastPrice),
TriggerType::MarkPrice => exec_inst.push(BitmexExecInstruction::MarkPrice),
TriggerType::IndexPrice => exec_inst.push(BitmexExecInstruction::IndexPrice),
_ => {} }
}
if !exec_inst.is_empty() {
params.exec_inst(exec_inst);
}
if let Some(contingency_type) = contingency_type {
let bitmex_contingency = BitmexContingencyType::try_from(contingency_type)?;
params.contingency_type(bitmex_contingency);
}
let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
let response = self.inner.place_order(params).await?;
let order: BitmexOrder = serde_json::from_value(response)?;
if order.ord_status == Some(BitmexOrderStatus::Rejected) {
let reason = order
.ord_rej_reason
.map_or_else(|| "No reason provided".to_string(), |r| r.to_string());
anyhow::bail!("Order rejected: {reason}");
}
self.order_type_cache.insert(client_order_id, order_type);
let instrument = self.instrument_from_cache(instrument_id.symbol.inner())?;
let ts_init = self.generate_ts_init();
parse_order_status_report(&order, &instrument, &self.order_type_cache, ts_init)
}
pub async fn cancel_order(
&self,
instrument_id: InstrumentId,
client_order_id: Option<ClientOrderId>,
venue_order_id: Option<VenueOrderId>,
) -> anyhow::Result<OrderStatusReport> {
let mut params = super::query::DeleteOrderParamsBuilder::default();
params.text(NAUTILUS_TRADER);
if let Some(venue_order_id) = venue_order_id {
params.order_id(vec![venue_order_id.as_str().to_string()]);
} else if let Some(client_order_id) = client_order_id {
params.cl_ord_id(vec![client_order_id.as_str().to_string()]);
} else {
anyhow::bail!("Either client_order_id or venue_order_id must be provided");
}
let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
let response = self.inner.cancel_orders(params).await?;
let orders: Vec<BitmexOrder> = serde_json::from_value(response)?;
let order = orders
.into_iter()
.next()
.ok_or_else(|| anyhow::anyhow!("No order returned in cancel response"))?;
let instrument = self.instrument_from_cache(instrument_id.symbol.inner())?;
let ts_init = self.generate_ts_init();
parse_order_status_report(&order, &instrument, &self.order_type_cache, ts_init)
}
pub async fn cancel_orders(
&self,
instrument_id: InstrumentId,
client_order_ids: Option<Vec<ClientOrderId>>,
venue_order_ids: Option<Vec<VenueOrderId>>,
) -> anyhow::Result<Vec<OrderStatusReport>> {
let mut params = super::query::DeleteOrderParamsBuilder::default();
params.text(NAUTILUS_TRADER);
if let Some(venue_order_ids) = venue_order_ids {
if venue_order_ids.is_empty() {
anyhow::bail!("venue_order_ids cannot be empty");
}
params.order_id(
venue_order_ids
.iter()
.map(|id| id.to_string())
.collect::<Vec<_>>(),
);
} else if let Some(client_order_ids) = client_order_ids {
if client_order_ids.is_empty() {
anyhow::bail!("client_order_ids cannot be empty");
}
params.cl_ord_id(
client_order_ids
.iter()
.map(|id| id.to_string())
.collect::<Vec<_>>(),
);
} else {
anyhow::bail!("Either client_order_ids or venue_order_ids must be provided");
}
let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
let response = self.inner.cancel_orders(params).await?;
let orders: Vec<BitmexOrder> = serde_json::from_value(response)?;
let ts_init = self.generate_ts_init();
let instrument = self.instrument_from_cache(instrument_id.symbol.inner())?;
let mut reports = Vec::new();
for order in orders {
reports.push(parse_order_status_report(
&order,
&instrument,
&self.order_type_cache,
ts_init,
)?);
}
Self::populate_linked_order_ids(&mut reports);
Ok(reports)
}
pub async fn cancel_all_orders(
&self,
instrument_id: InstrumentId,
order_side: Option<OrderSide>,
) -> anyhow::Result<Vec<OrderStatusReport>> {
let mut params = DeleteAllOrdersParamsBuilder::default();
params.text(NAUTILUS_TRADER);
params.symbol(instrument_id.symbol.as_str());
if let Some(side) = order_side {
if side == OrderSide::NoOrderSide {
log::debug!("Ignoring NoOrderSide filter for cancel_all_orders on {instrument_id}",);
} else {
let side = BitmexSide::from(side.as_specified());
params.filter(serde_json::json!({
"side": side
}));
}
}
let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
let response = self.inner.cancel_all_orders(params).await?;
let orders: Vec<BitmexOrder> = serde_json::from_value(response)?;
let instrument = self.instrument_from_cache(instrument_id.symbol.inner())?;
let ts_init = self.generate_ts_init();
let mut reports = Vec::new();
for order in orders {
reports.push(parse_order_status_report(
&order,
&instrument,
&self.order_type_cache,
ts_init,
)?);
}
Self::populate_linked_order_ids(&mut reports);
Ok(reports)
}
pub async fn modify_order(
&self,
instrument_id: InstrumentId,
client_order_id: Option<ClientOrderId>,
venue_order_id: Option<VenueOrderId>,
quantity: Option<Quantity>,
price: Option<Price>,
trigger_price: Option<Price>,
) -> anyhow::Result<OrderStatusReport> {
let mut params = PutOrderParamsBuilder::default();
params.text(NAUTILUS_TRADER);
if let Some(venue_order_id) = venue_order_id {
params.order_id(venue_order_id.as_str());
} else if let Some(client_order_id) = client_order_id {
params.orig_cl_ord_id(client_order_id.as_str());
} else {
anyhow::bail!("Either client_order_id or venue_order_id must be provided");
}
if let Some(quantity) = quantity {
let instrument = self.instrument_from_cache(instrument_id.symbol.inner())?;
params.order_qty(quantity_to_u32(&quantity, &instrument));
}
if let Some(price) = price {
params.price(price.as_f64());
}
if let Some(trigger_price) = trigger_price {
params.stop_px(trigger_price.as_f64());
}
let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
let response = self.inner.amend_order(params).await?;
let order: BitmexOrder = serde_json::from_value(response)?;
if order.ord_status == Some(BitmexOrderStatus::Rejected) {
let reason = order
.ord_rej_reason
.map_or_else(|| "No reason provided".to_string(), |r| r.to_string());
anyhow::bail!("Order modification rejected: {reason}");
}
let instrument = self.instrument_from_cache(instrument_id.symbol.inner())?;
let ts_init = self.generate_ts_init();
parse_order_status_report(&order, &instrument, &self.order_type_cache, ts_init)
}
pub async fn query_order(
&self,
instrument_id: InstrumentId,
client_order_id: Option<ClientOrderId>,
venue_order_id: Option<VenueOrderId>,
) -> anyhow::Result<Option<OrderStatusReport>> {
let mut params = GetOrderParamsBuilder::default();
let filter_json = if let Some(client_order_id) = client_order_id {
serde_json::json!({
"clOrdID": client_order_id.to_string()
})
} else if let Some(venue_order_id) = venue_order_id {
serde_json::json!({
"orderID": venue_order_id.to_string()
})
} else {
anyhow::bail!("Either client_order_id or venue_order_id must be provided");
};
params.filter(filter_json);
params.count(1);
let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
let response = self.inner.get_orders(params).await?;
if response.is_empty() {
return Ok(None);
}
let order = &response[0];
let instrument = self.instrument_from_cache(instrument_id.symbol.inner())?;
let ts_init = self.generate_ts_init();
let report =
parse_order_status_report(order, &instrument, &self.order_type_cache, ts_init)?;
Ok(Some(report))
}
pub async fn request_order_status_report(
&self,
instrument_id: InstrumentId,
client_order_id: Option<ClientOrderId>,
venue_order_id: Option<VenueOrderId>,
) -> anyhow::Result<OrderStatusReport> {
if venue_order_id.is_none() && client_order_id.is_none() {
anyhow::bail!("Either venue_order_id or client_order_id must be provided");
}
let mut params = GetOrderParamsBuilder::default();
params.symbol(instrument_id.symbol.as_str());
if let Some(venue_order_id) = venue_order_id {
params.filter(serde_json::json!({
"orderID": venue_order_id.as_str()
}));
} else if let Some(client_order_id) = client_order_id {
params.filter(serde_json::json!({
"clOrdID": client_order_id.as_str()
}));
}
params.count(1i32);
let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
let response = self.inner.get_orders(params).await?;
let order = response
.into_iter()
.next()
.ok_or_else(|| anyhow::anyhow!("Order not found"))?;
let instrument = self.instrument_from_cache(instrument_id.symbol.inner())?;
let ts_init = self.generate_ts_init();
parse_order_status_report(&order, &instrument, &self.order_type_cache, ts_init)
}
pub async fn request_order_status_reports(
&self,
instrument_id: Option<InstrumentId>,
open_only: bool,
start: Option<DateTime<Utc>>,
end: Option<DateTime<Utc>>,
limit: Option<u32>,
) -> anyhow::Result<Vec<OrderStatusReport>> {
if let (Some(start), Some(end)) = (start, end) {
anyhow::ensure!(
start < end,
"Invalid time range: start={start:?} end={end:?}",
);
}
let mut params = GetOrderParamsBuilder::default();
if let Some(instrument_id) = &instrument_id {
params.symbol(instrument_id.symbol.as_str());
}
if open_only {
params.filter(serde_json::json!({
"open": true
}));
}
if let Some(start) = start {
params.start_time(start);
}
if let Some(end) = end {
params.end_time(end);
}
if let Some(limit) = limit {
params.count(limit as i32);
} else {
params.count(500); }
params.reverse(true);
let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
let response = self.inner.get_orders(params).await?;
let ts_init = self.generate_ts_init();
let mut reports = Vec::new();
for order in response {
if let Some(start) = start {
match order.timestamp {
Some(timestamp) if timestamp < start => continue,
Some(_) => {}
None => {
log::debug!("Skipping order report without timestamp for bounded query");
continue;
}
}
}
if let Some(end) = end {
match order.timestamp {
Some(timestamp) if timestamp > end => continue,
Some(_) => {}
None => {
log::debug!("Skipping order report without timestamp for bounded query");
continue;
}
}
}
let Some(symbol) = order.symbol else {
log::warn!("Order response missing symbol, skipping");
continue;
};
let Ok(instrument) = self.instrument_from_cache(symbol) else {
log::debug!("Skipping order report for instrument not in cache: symbol={symbol}");
continue;
};
match parse_order_status_report(&order, &instrument, &self.order_type_cache, ts_init) {
Ok(report) => reports.push(report),
Err(e) => log::error!("Failed to parse order status report: {e}"),
}
}
Self::populate_linked_order_ids(&mut reports);
Ok(reports)
}
pub async fn request_trades(
&self,
instrument_id: InstrumentId,
start: Option<DateTime<Utc>>,
end: Option<DateTime<Utc>>,
limit: Option<u32>,
) -> anyhow::Result<Vec<TradeTick>> {
let mut params = GetTradeParamsBuilder::default();
params.symbol(instrument_id.symbol.as_str());
if let Some(start) = start {
params.start_time(start);
}
if let Some(end) = end {
params.end_time(end);
}
if let (Some(start), Some(end)) = (start, end) {
anyhow::ensure!(
start < end,
"Invalid time range: start={start:?} end={end:?}",
);
}
if let Some(limit) = limit {
let clamped_limit = limit.min(1000);
if limit > 1000 {
log::warn!(
"BitMEX trade request limit exceeds venue maximum; clamping: limit={limit}, clamped_limit={clamped_limit}",
);
}
params.count(i32::try_from(clamped_limit).unwrap_or(1000));
}
params.reverse(false);
let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
let response = self.inner.get_trades(params).await?;
let ts_init = self.generate_ts_init();
let mut parsed_trades = Vec::new();
for trade in response {
if let Some(start) = start
&& trade.timestamp < start
{
continue;
}
if let Some(end) = end
&& trade.timestamp > end
{
continue;
}
let Some(instrument) = self.get_instrument(&trade.symbol) else {
log::error!(
"Instrument {} not found in cache, skipping trade",
trade.symbol
);
continue;
};
match parse_trade(&trade, &instrument, ts_init) {
Ok(trade) => parsed_trades.push(trade),
Err(e) => log::error!("Failed to parse trade: {e}"),
}
}
Ok(parsed_trades)
}
pub async fn request_bars(
&self,
mut bar_type: BarType,
start: Option<DateTime<Utc>>,
end: Option<DateTime<Utc>>,
limit: Option<u32>,
partial: bool,
) -> anyhow::Result<Vec<Bar>> {
bar_type = bar_type.standard();
anyhow::ensure!(
bar_type.aggregation_source() == AggregationSource::External,
"Only EXTERNAL aggregation bars are supported"
);
anyhow::ensure!(
bar_type.spec().price_type == PriceType::Last,
"Only LAST price type bars are supported"
);
if let (Some(start), Some(end)) = (start, end) {
anyhow::ensure!(
start < end,
"Invalid time range: start={start:?} end={end:?}"
);
}
let spec = bar_type.spec();
let bin_size = match (spec.aggregation, spec.step.get()) {
(BarAggregation::Minute, 1) => "1m",
(BarAggregation::Minute, 5) => "5m",
(BarAggregation::Hour, 1) => "1h",
(BarAggregation::Day, 1) => "1d",
_ => anyhow::bail!(
"BitMEX does not support {}-{:?}-{:?} bars",
spec.step.get(),
spec.aggregation,
spec.price_type,
),
};
let instrument_id = bar_type.instrument_id();
let instrument = self.instrument_from_cache(instrument_id.symbol.inner())?;
let mut params = GetTradeBucketedParamsBuilder::default();
params.symbol(instrument_id.symbol.as_str());
params.bin_size(bin_size);
if partial {
params.partial(true);
}
if let Some(start) = start {
params.start_time(start);
}
if let Some(end) = end {
params.end_time(end);
}
if let Some(limit) = limit {
let clamped_limit = limit.min(1000);
if limit > 1000 {
log::warn!(
"BitMEX bar request limit exceeds venue maximum; clamping: limit={limit}, clamped_limit={clamped_limit}",
);
}
params.count(i32::try_from(clamped_limit).unwrap_or(1000));
}
params.reverse(false);
let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
let response = self.inner.get_trade_bucketed(params).await?;
let ts_init = self.generate_ts_init();
let mut bars = Vec::new();
for bin in response {
if let Some(start) = start
&& bin.timestamp < start
{
continue;
}
if let Some(end) = end
&& bin.timestamp > end
{
continue;
}
if bin.symbol != instrument_id.symbol.inner() {
log::warn!(
"Skipping trade bin for unexpected symbol: symbol={}, expected={}",
bin.symbol,
instrument_id.symbol,
);
continue;
}
match parse_trade_bin(&bin, &instrument, &bar_type, ts_init) {
Ok(bar) => bars.push(bar),
Err(e) => log::warn!("Failed to parse trade bin: {e}"),
}
}
Ok(bars)
}
pub async fn request_fill_reports(
&self,
instrument_id: Option<InstrumentId>,
start: Option<DateTime<Utc>>,
end: Option<DateTime<Utc>>,
limit: Option<u32>,
) -> anyhow::Result<Vec<FillReport>> {
if let (Some(start), Some(end)) = (start, end) {
anyhow::ensure!(
start < end,
"Invalid time range: start={start:?} end={end:?}",
);
}
let mut params = GetExecutionParamsBuilder::default();
if let Some(instrument_id) = instrument_id {
params.symbol(instrument_id.symbol.as_str());
}
if let Some(start) = start {
params.start_time(start);
}
if let Some(end) = end {
params.end_time(end);
}
if let Some(limit) = limit {
params.count(limit as i32);
} else {
params.count(500); }
params.reverse(true);
let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
let response = self.inner.get_executions(params).await?;
let ts_init = self.generate_ts_init();
let mut reports = Vec::new();
for exec in response {
if let Some(start) = start {
match exec.transact_time {
Some(timestamp) if timestamp < start => continue,
Some(_) => {}
None => {
log::debug!("Skipping fill report without transact_time for bounded query");
continue;
}
}
}
if let Some(end) = end {
match exec.transact_time {
Some(timestamp) if timestamp > end => continue,
Some(_) => {}
None => {
log::debug!("Skipping fill report without transact_time for bounded query");
continue;
}
}
}
let Some(symbol) = exec.symbol else {
log::debug!("Skipping execution without symbol: {:?}", exec.exec_type);
continue;
};
let symbol_str = symbol.to_string();
let instrument = match self.instrument_from_cache(symbol) {
Ok(instrument) => instrument,
Err(e) => {
log::error!(
"Instrument not found in cache for execution parsing: symbol={symbol_str}, {e}"
);
continue;
}
};
match parse_fill_report(&exec, &instrument, ts_init) {
Ok(report) => reports.push(report),
Err(e) => {
let error_msg = e.to_string();
if error_msg.starts_with("Skipping non-trade execution")
|| error_msg.starts_with("Skipping execution without order_id")
{
log::debug!("{e}");
} else {
log::error!("Failed to parse fill report: {e}");
}
}
}
}
Ok(reports)
}
pub async fn request_position_status_reports(
&self,
) -> anyhow::Result<Vec<PositionStatusReport>> {
let params = GetPositionParamsBuilder::default()
.count(500) .build()
.map_err(|e| anyhow::anyhow!(e))?;
let response = self.inner.get_positions(params).await?;
let ts_init = self.generate_ts_init();
let mut reports = Vec::new();
for pos in response {
let symbol = pos.symbol;
let instrument = match self.instrument_from_cache(symbol) {
Ok(instrument) => instrument,
Err(e) => {
log::error!(
"Instrument not found in cache for position parsing: symbol={}, {e}",
pos.symbol.as_str(),
);
continue;
}
};
match parse_position_report(&pos, &instrument, ts_init) {
Ok(report) => reports.push(report),
Err(e) => log::error!("Failed to parse position report: {e}"),
}
}
Ok(reports)
}
pub async fn update_position_leverage(
&self,
symbol: &str,
leverage: f64,
) -> anyhow::Result<PositionStatusReport> {
let params = PostPositionLeverageParams {
symbol: symbol.to_string(),
leverage,
target_account_id: None,
};
let response = self.inner.update_position_leverage(params).await?;
let instrument = self.instrument_from_cache(Ustr::from(symbol))?;
let ts_init = self.generate_ts_init();
parse_position_report(&response, &instrument, ts_init)
}
}
#[cfg(test)]
mod tests {
use nautilus_core::UUID4;
use nautilus_model::enums::OrderStatus;
use rstest::rstest;
use serde_json::json;
use super::*;
fn build_report(
client_order_id: &str,
venue_order_id: &str,
contingency_type: ContingencyType,
order_list_id: Option<&str>,
) -> OrderStatusReport {
let mut report = OrderStatusReport::new(
AccountId::from("BITMEX-1"),
InstrumentId::from("XBTUSD.BITMEX"),
Some(ClientOrderId::from(client_order_id)),
VenueOrderId::from(venue_order_id),
OrderSide::Buy,
OrderType::Limit,
TimeInForce::Gtc,
OrderStatus::Accepted,
Quantity::new(100.0, 0),
Quantity::default(),
UnixNanos::from(1_u64),
UnixNanos::from(1_u64),
UnixNanos::from(1_u64),
Some(UUID4::new()),
);
if let Some(id) = order_list_id {
report = report.with_order_list_id(OrderListId::from(id));
}
report.with_contingency_type(contingency_type)
}
#[rstest]
fn test_sign_request_generates_correct_headers() {
let client = BitmexRawHttpClient::with_credentials(
"test_api_key".to_string(),
"test_api_secret".to_string(),
"http://localhost:8080".to_string(),
60,
3,
1_000,
10_000,
10_000,
10,
120,
None,
)
.expect("Failed to create test client");
let headers = client
.sign_request(&Method::GET, "/api/v1/order", None)
.unwrap();
assert!(headers.contains_key("api-key"));
assert!(headers.contains_key("api-signature"));
assert!(headers.contains_key("api-expires"));
assert_eq!(headers.get("api-key").unwrap(), "test_api_key");
}
#[rstest]
fn test_sign_request_with_body() {
let client = BitmexRawHttpClient::with_credentials(
"test_api_key".to_string(),
"test_api_secret".to_string(),
"http://localhost:8080".to_string(),
60,
3,
1_000,
10_000,
10_000,
10,
120,
None,
)
.expect("Failed to create test client");
let body = json!({"symbol": "XBTUSD", "orderQty": 100});
let body_bytes = serde_json::to_vec(&body).unwrap();
let headers_without_body = client
.sign_request(&Method::POST, "/api/v1/order", None)
.unwrap();
let headers_with_body = client
.sign_request(&Method::POST, "/api/v1/order", Some(&body_bytes))
.unwrap();
assert_ne!(
headers_without_body.get("api-signature").unwrap(),
headers_with_body.get("api-signature").unwrap()
);
}
#[rstest]
fn test_sign_request_uses_custom_recv_window() {
let client_default = BitmexRawHttpClient::with_credentials(
"test_api_key".to_string(),
"test_api_secret".to_string(),
"http://localhost:8080".to_string(),
60,
3,
1_000,
10_000,
10_000, 10,
120,
None,
)
.expect("Failed to create test client");
let client_custom = BitmexRawHttpClient::with_credentials(
"test_api_key".to_string(),
"test_api_secret".to_string(),
"http://localhost:8080".to_string(),
60,
3,
1_000,
10_000,
30_000, 10,
120,
None,
)
.expect("Failed to create test client");
let headers_default = client_default
.sign_request(&Method::GET, "/api/v1/order", None)
.unwrap();
let headers_custom = client_custom
.sign_request(&Method::GET, "/api/v1/order", None)
.unwrap();
let expires_default: i64 = headers_default.get("api-expires").unwrap().parse().unwrap();
let expires_custom: i64 = headers_custom.get("api-expires").unwrap().parse().unwrap();
let now = Utc::now().timestamp();
assert!(expires_default > now);
assert!(expires_custom > now);
assert!(expires_custom > expires_default);
let diff = expires_custom - expires_default;
assert!((18..=25).contains(&diff));
}
#[rstest]
fn test_populate_linked_order_ids_from_order_list() {
let base = "O-20250922-002219-001-000";
let entry = format!("{base}-1");
let stop = format!("{base}-2");
let take = format!("{base}-3");
let mut reports = vec![
build_report(&entry, "V-1", ContingencyType::Oto, Some("OL-1")),
build_report(&stop, "V-2", ContingencyType::Ouo, Some("OL-1")),
build_report(&take, "V-3", ContingencyType::Ouo, Some("OL-1")),
];
BitmexHttpClient::populate_linked_order_ids(&mut reports);
assert_eq!(
reports[0].linked_order_ids,
Some(vec![
ClientOrderId::from(stop.as_str()),
ClientOrderId::from(take.as_str()),
]),
);
assert_eq!(
reports[1].linked_order_ids,
Some(vec![
ClientOrderId::from(entry.as_str()),
ClientOrderId::from(take.as_str()),
]),
);
assert_eq!(
reports[2].linked_order_ids,
Some(vec![
ClientOrderId::from(entry.as_str()),
ClientOrderId::from(stop.as_str()),
]),
);
}
#[rstest]
fn test_populate_linked_order_ids_from_id_prefix() {
let base = "O-20250922-002220-001-000";
let entry = format!("{base}-1");
let stop = format!("{base}-2");
let take = format!("{base}-3");
let mut reports = vec![
build_report(&entry, "V-1", ContingencyType::Oto, None),
build_report(&stop, "V-2", ContingencyType::Ouo, None),
build_report(&take, "V-3", ContingencyType::Ouo, None),
];
BitmexHttpClient::populate_linked_order_ids(&mut reports);
assert_eq!(
reports[0].linked_order_ids,
Some(vec![
ClientOrderId::from(stop.as_str()),
ClientOrderId::from(take.as_str()),
]),
);
assert_eq!(
reports[1].linked_order_ids,
Some(vec![
ClientOrderId::from(entry.as_str()),
ClientOrderId::from(take.as_str()),
]),
);
assert_eq!(
reports[2].linked_order_ids,
Some(vec![
ClientOrderId::from(entry.as_str()),
ClientOrderId::from(stop.as_str()),
]),
);
}
#[rstest]
fn test_populate_linked_order_ids_respects_non_contingent_orders() {
let base = "O-20250922-002221-001-000";
let entry = format!("{base}-1");
let passive = format!("{base}-2");
let mut reports = vec![
build_report(&entry, "V-1", ContingencyType::NoContingency, None),
build_report(&passive, "V-2", ContingencyType::Ouo, None),
];
BitmexHttpClient::populate_linked_order_ids(&mut reports);
assert!(reports[0].linked_order_ids.is_none());
assert!(reports[1].linked_order_ids.is_none());
assert_eq!(reports[1].contingency_type, ContingencyType::NoContingency);
}
}