Skip to main content

nautilus_bitmex/http/
client.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Provides the HTTP client integration for the [BitMEX](https://bitmex.com) REST API.
17//!
18//! This module defines and implements a [`BitmexHttpClient`] for
19//! sending requests to various BitMEX endpoints. It handles request signing
20//! (when credentials are provided), constructs valid HTTP requests
21//! using the [`HttpClient`], and parses the responses back into structured data or a [`BitmexHttpError`].
22//!
23//! BitMEX API reference <https://www.bitmex.com/api/explorer/#/default>.
24
25use std::{
26    collections::HashMap,
27    num::NonZeroU32,
28    sync::{
29        Arc, LazyLock, RwLock,
30        atomic::{AtomicBool, Ordering},
31    },
32};
33
34use chrono::{DateTime, Utc};
35use dashmap::DashMap;
36use nautilus_core::{
37    AtomicTime, UUID4, UnixNanos,
38    consts::{NAUTILUS_TRADER, NAUTILUS_USER_AGENT},
39    env::get_or_env_var_opt,
40    time::get_atomic_clock_realtime,
41};
42use nautilus_model::{
43    data::{Bar, BarType, TradeTick},
44    enums::{
45        AccountType, AggregationSource, BarAggregation, ContingencyType, OrderSide, OrderType,
46        PriceType, TimeInForce, TrailingOffsetType, TriggerType,
47    },
48    events::AccountState,
49    identifiers::{AccountId, ClientOrderId, InstrumentId, OrderListId, VenueOrderId},
50    instruments::{Instrument as InstrumentTrait, InstrumentAny},
51    reports::{FillReport, OrderStatusReport, PositionStatusReport},
52    types::{Price, Quantity},
53};
54use nautilus_network::{
55    http::{HttpClient, Method, StatusCode, USER_AGENT},
56    ratelimiter::quota::Quota,
57    retry::{RetryConfig, RetryManager},
58};
59use serde::{Deserialize, Serialize, de::DeserializeOwned};
60use serde_json::Value;
61use tokio_util::sync::CancellationToken;
62use ustr::Ustr;
63
64use super::{
65    error::{BitmexErrorResponse, BitmexHttpError},
66    models::{
67        BitmexApiInfo, BitmexExecution, BitmexInstrument, BitmexMargin, BitmexOrder,
68        BitmexPosition, BitmexTrade, BitmexTradeBin, BitmexWallet,
69    },
70    query::{
71        DeleteAllOrdersParams, DeleteOrderParams, GetExecutionParams, GetExecutionParamsBuilder,
72        GetOrderParams, GetPositionParams, GetPositionParamsBuilder, GetTradeBucketedParams,
73        GetTradeBucketedParamsBuilder, GetTradeParams, GetTradeParamsBuilder,
74        PostCancelAllAfterParams, PostOrderParams, PostPositionLeverageParams, PutOrderParams,
75    },
76};
77use crate::{
78    common::{
79        consts::{BITMEX_HTTP_TESTNET_URL, BITMEX_HTTP_URL},
80        credential::{Credential, credential_env_vars},
81        enums::{
82            BitmexContingencyType, BitmexExecInstruction, BitmexOrderStatus, BitmexOrderType,
83            BitmexPegPriceType, BitmexSide, BitmexTimeInForce,
84        },
85        parse::{parse_account_balance, quantity_to_u32},
86    },
87    http::{
88        parse::{
89            InstrumentParseResult, parse_fill_report, parse_instrument_any,
90            parse_order_status_report, parse_position_report, parse_trade, parse_trade_bin,
91        },
92        query::{DeleteAllOrdersParamsBuilder, GetOrderParamsBuilder, PutOrderParamsBuilder},
93    },
94    websocket::messages::BitmexMarginMsg,
95};
96
97/// Default BitMEX REST API rate limits.
98///
99/// BitMEX implements a dual-layer rate limiting system:
100/// - Primary limit: 120 requests per minute for authenticated users (30 for unauthenticated).
101/// - Secondary limit: 10 requests per second burst limit for specific endpoints.
102const BITMEX_DEFAULT_RATE_LIMIT_PER_SECOND: u32 = 10;
103const BITMEX_DEFAULT_RATE_LIMIT_PER_MINUTE_AUTHENTICATED: u32 = 120;
104const BITMEX_DEFAULT_RATE_LIMIT_PER_MINUTE_UNAUTHENTICATED: u32 = 30;
105
106const BITMEX_GLOBAL_RATE_KEY: &str = "bitmex:global";
107const BITMEX_MINUTE_RATE_KEY: &str = "bitmex:minute";
108
109static RATE_LIMIT_KEYS: LazyLock<Vec<Ustr>> = LazyLock::new(|| {
110    vec![
111        Ustr::from(BITMEX_GLOBAL_RATE_KEY),
112        Ustr::from(BITMEX_MINUTE_RATE_KEY),
113    ]
114});
115
116/// Represents a BitMEX HTTP response.
117#[derive(Debug, Serialize, Deserialize)]
118pub struct BitmexResponse<T> {
119    /// The typed data returned by the BitMEX endpoint.
120    pub data: Vec<T>,
121}
122
123/// Provides a lower-level HTTP client for connecting to the [BitMEX](https://bitmex.com) REST API.
124///
125/// This client wraps the underlying [`HttpClient`] to handle functionality
126/// specific to BitMEX, such as request signing (for authenticated endpoints),
127/// forming request URLs, and deserializing responses into specific data models.
128///
129/// # Connection Management
130///
131/// The client uses HTTP keep-alive for connection pooling with a 90-second idle timeout,
132/// which matches BitMEX's server-side keep-alive timeout. Connections are automatically
133/// reused for subsequent requests to minimize latency.
134///
135/// # Rate Limiting
136///
137/// BitMEX enforces the following rate limits:
138/// - 120 requests per minute for authenticated users (30 for unauthenticated).
139/// - 10 requests per second burst limit for certain endpoints (order management).
140///
141/// The client automatically respects these limits through the configured quota.
142#[derive(Debug, Clone)]
143pub struct BitmexRawHttpClient {
144    base_url: String,
145    client: HttpClient,
146    credential: Option<Credential>,
147    recv_window_ms: u64,
148    retry_manager: RetryManager<BitmexHttpError>,
149    cancellation_token: Arc<RwLock<CancellationToken>>,
150}
151
152impl Default for BitmexRawHttpClient {
153    fn default() -> Self {
154        Self::new(None, Some(60), None, None, None, None, None, None, None)
155            .expect("Failed to create default BitmexHttpInnerClient")
156    }
157}
158
159impl BitmexRawHttpClient {
160    /// Creates a new [`BitmexRawHttpClient`] using the default BitMEX HTTP URL,
161    /// optionally overridden with a custom base URL.
162    ///
163    /// This version of the client has **no credentials**, so it can only
164    /// call publicly accessible endpoints.
165    ///
166    /// # Errors
167    ///
168    /// Returns an error if the retry manager cannot be created.
169    #[allow(clippy::too_many_arguments)]
170    pub fn new(
171        base_url: Option<String>,
172        timeout_secs: Option<u64>,
173        max_retries: Option<u32>,
174        retry_delay_ms: Option<u64>,
175        retry_delay_max_ms: Option<u64>,
176        recv_window_ms: Option<u64>,
177        max_requests_per_second: Option<u32>,
178        max_requests_per_minute: Option<u32>,
179        proxy_url: Option<String>,
180    ) -> Result<Self, BitmexHttpError> {
181        let retry_config = RetryConfig {
182            max_retries: max_retries.unwrap_or(3),
183            initial_delay_ms: retry_delay_ms.unwrap_or(1000),
184            max_delay_ms: retry_delay_max_ms.unwrap_or(10_000),
185            backoff_factor: 2.0,
186            jitter_ms: 1000,
187            operation_timeout_ms: Some(60_000),
188            immediate_first: false,
189            max_elapsed_ms: Some(180_000),
190        };
191
192        let retry_manager = RetryManager::new(retry_config);
193
194        let max_req_per_sec =
195            max_requests_per_second.unwrap_or(BITMEX_DEFAULT_RATE_LIMIT_PER_SECOND);
196        let max_req_per_min =
197            max_requests_per_minute.unwrap_or(BITMEX_DEFAULT_RATE_LIMIT_PER_MINUTE_UNAUTHENTICATED);
198
199        Ok(Self {
200            base_url: base_url.unwrap_or(BITMEX_HTTP_URL.to_string()),
201            client: HttpClient::new(
202                Self::default_headers(),
203                vec![],
204                Self::rate_limiter_quotas(max_req_per_sec, max_req_per_min)?,
205                Some(Self::default_quota(max_req_per_sec)?),
206                timeout_secs,
207                proxy_url,
208            )
209            .map_err(|e| {
210                BitmexHttpError::NetworkError(format!("Failed to create HTTP client: {e}"))
211            })?,
212            credential: None,
213            recv_window_ms: recv_window_ms.unwrap_or(10_000),
214            retry_manager,
215            cancellation_token: Arc::new(RwLock::new(CancellationToken::new())),
216        })
217    }
218
219    /// Creates a new [`BitmexRawHttpClient`] configured with credentials
220    /// for authenticated requests, optionally using a custom base URL.
221    ///
222    /// # Errors
223    ///
224    /// Returns an error if the retry manager cannot be created.
225    #[allow(clippy::too_many_arguments)]
226    pub fn with_credentials(
227        api_key: String,
228        api_secret: String,
229        base_url: String,
230        timeout_secs: Option<u64>,
231        max_retries: Option<u32>,
232        retry_delay_ms: Option<u64>,
233        retry_delay_max_ms: Option<u64>,
234        recv_window_ms: Option<u64>,
235        max_requests_per_second: Option<u32>,
236        max_requests_per_minute: Option<u32>,
237        proxy_url: Option<String>,
238    ) -> Result<Self, BitmexHttpError> {
239        let retry_config = RetryConfig {
240            max_retries: max_retries.unwrap_or(3),
241            initial_delay_ms: retry_delay_ms.unwrap_or(1000),
242            max_delay_ms: retry_delay_max_ms.unwrap_or(10_000),
243            backoff_factor: 2.0,
244            jitter_ms: 1000,
245            operation_timeout_ms: Some(60_000),
246            immediate_first: false,
247            max_elapsed_ms: Some(180_000),
248        };
249
250        let retry_manager = RetryManager::new(retry_config);
251
252        let max_req_per_sec =
253            max_requests_per_second.unwrap_or(BITMEX_DEFAULT_RATE_LIMIT_PER_SECOND);
254        let max_req_per_min =
255            max_requests_per_minute.unwrap_or(BITMEX_DEFAULT_RATE_LIMIT_PER_MINUTE_AUTHENTICATED);
256
257        Ok(Self {
258            base_url,
259            client: HttpClient::new(
260                Self::default_headers(),
261                vec![],
262                Self::rate_limiter_quotas(max_req_per_sec, max_req_per_min)?,
263                Some(Self::default_quota(max_req_per_sec)?),
264                timeout_secs,
265                proxy_url,
266            )
267            .map_err(|e| {
268                BitmexHttpError::NetworkError(format!("Failed to create HTTP client: {e}"))
269            })?,
270            credential: Some(Credential::new(api_key, api_secret)),
271            recv_window_ms: recv_window_ms.unwrap_or(10_000),
272            retry_manager,
273            cancellation_token: Arc::new(RwLock::new(CancellationToken::new())),
274        })
275    }
276
277    fn default_headers() -> HashMap<String, String> {
278        HashMap::from([(USER_AGENT.to_string(), NAUTILUS_USER_AGENT.to_string())])
279    }
280
281    fn default_quota(max_requests_per_second: u32) -> Result<Quota, BitmexHttpError> {
282        let burst = NonZeroU32::new(max_requests_per_second)
283            .unwrap_or(NonZeroU32::new(BITMEX_DEFAULT_RATE_LIMIT_PER_SECOND).expect("non-zero"));
284        Quota::per_second(burst).ok_or_else(|| {
285            BitmexHttpError::ValidationError(format!(
286                "Invalid max_requests_per_second: {max_requests_per_second} exceeds maximum"
287            ))
288        })
289    }
290
291    fn rate_limiter_quotas(
292        max_requests_per_second: u32,
293        max_requests_per_minute: u32,
294    ) -> Result<Vec<(String, Quota)>, BitmexHttpError> {
295        let per_sec_quota = Self::default_quota(max_requests_per_second)?;
296        let per_min_quota =
297            Quota::per_minute(NonZeroU32::new(max_requests_per_minute).unwrap_or_else(|| {
298                NonZeroU32::new(BITMEX_DEFAULT_RATE_LIMIT_PER_MINUTE_AUTHENTICATED)
299                    .expect("non-zero")
300            }));
301
302        Ok(vec![
303            (BITMEX_GLOBAL_RATE_KEY.to_string(), per_sec_quota),
304            (BITMEX_MINUTE_RATE_KEY.to_string(), per_min_quota),
305        ])
306    }
307
308    fn rate_limit_keys() -> Vec<Ustr> {
309        RATE_LIMIT_KEYS.clone()
310    }
311
312    /// Cancel all pending HTTP requests.
313    ///
314    /// # Panics
315    ///
316    /// Panics if the cancellation token lock is poisoned.
317    pub fn cancel_all_requests(&self) {
318        self.cancellation_token
319            .read()
320            .expect("cancellation token lock poisoned")
321            .cancel();
322    }
323
324    /// Replace the cancellation token so new requests can proceed.
325    ///
326    /// # Panics
327    ///
328    /// Panics if the cancellation token lock is poisoned.
329    pub fn reset_cancellation_token(&self) {
330        *self
331            .cancellation_token
332            .write()
333            .expect("cancellation token lock poisoned") = CancellationToken::new();
334    }
335
336    /// Get a clone of the cancellation token for this client.
337    ///
338    /// # Panics
339    ///
340    /// Panics if the cancellation token lock is poisoned.
341    pub fn cancellation_token(&self) -> CancellationToken {
342        self.cancellation_token
343            .read()
344            .expect("cancellation token lock poisoned")
345            .clone()
346    }
347
348    fn sign_request(
349        &self,
350        method: &Method,
351        endpoint: &str,
352        body: Option<&[u8]>,
353    ) -> Result<HashMap<String, String>, BitmexHttpError> {
354        let credential = self
355            .credential
356            .as_ref()
357            .ok_or(BitmexHttpError::MissingCredentials)?;
358
359        let expires = Utc::now().timestamp() + (self.recv_window_ms / 1000) as i64;
360        let body_str = body.and_then(|b| std::str::from_utf8(b).ok()).unwrap_or("");
361
362        let full_path = if endpoint.starts_with("/api/v1") {
363            endpoint.to_string()
364        } else {
365            format!("/api/v1{endpoint}")
366        };
367
368        let signature = credential.sign(method.as_str(), &full_path, expires, body_str);
369
370        let mut headers = HashMap::new();
371        headers.insert("api-expires".to_string(), expires.to_string());
372        headers.insert("api-key".to_string(), credential.api_key().to_string());
373        headers.insert("api-signature".to_string(), signature);
374
375        // Add Content-Type header for form-encoded body
376        if body.is_some()
377            && (*method == Method::POST || *method == Method::PUT || *method == Method::DELETE)
378        {
379            headers.insert(
380                "Content-Type".to_string(),
381                "application/x-www-form-urlencoded".to_string(),
382            );
383        }
384
385        Ok(headers)
386    }
387
388    async fn send_request<T: DeserializeOwned, P: Serialize>(
389        &self,
390        method: Method,
391        endpoint: &str,
392        params: Option<&P>,
393        body: Option<Vec<u8>>,
394        authenticate: bool,
395    ) -> Result<T, BitmexHttpError> {
396        let endpoint = endpoint.to_string();
397        let method_clone = method.clone();
398        let body_clone = body.clone();
399
400        // Serialize params before closure to avoid reference lifetime issues
401        // Query params are used with GET and DELETE methods
402        let params_str = if method == Method::GET || method == Method::DELETE {
403            params
404                .map(serde_urlencoded::to_string)
405                .transpose()
406                .map_err(|e| {
407                    BitmexHttpError::JsonError(format!("Failed to serialize params: {e}"))
408                })?
409        } else {
410            None
411        };
412
413        let full_endpoint = match params_str {
414            Some(ref query) if !query.is_empty() => format!("{endpoint}?{query}"),
415            _ => endpoint.clone(),
416        };
417
418        let url = format!("{}{}", self.base_url, full_endpoint);
419
420        let operation = || {
421            let url = url.clone();
422            let method = method_clone.clone();
423            let body = body_clone.clone();
424            let full_endpoint = full_endpoint.clone();
425
426            async move {
427                let headers = if authenticate {
428                    Some(self.sign_request(&method, &full_endpoint, body.as_deref())?)
429                } else {
430                    None
431                };
432
433                let rate_keys = Self::rate_limit_keys();
434                let resp = self
435                    .client
436                    .request_with_ustr_keys(method, url, None, headers, body, None, Some(rate_keys))
437                    .await?;
438
439                if resp.status.is_success() {
440                    serde_json::from_slice(&resp.body).map_err(Into::into)
441                } else if let Ok(error_resp) =
442                    serde_json::from_slice::<BitmexErrorResponse>(&resp.body)
443                {
444                    Err(error_resp.into())
445                } else {
446                    Err(BitmexHttpError::UnexpectedStatus {
447                        status: StatusCode::from_u16(resp.status.as_u16())
448                            .unwrap_or(StatusCode::INTERNAL_SERVER_ERROR),
449                        body: String::from_utf8_lossy(&resp.body).to_string(),
450                    })
451                }
452            }
453        };
454
455        // Retry strategy based on BitMEX error responses and HTTP status codes:
456        //
457        // 1. Network errors: always retry (transient connection issues).
458        // 2. HTTP 5xx/429: server errors and rate limiting should be retried.
459        // 3. BitMEX JSON errors with specific handling:
460        //    - "RateLimitError": explicit rate limit error from BitMEX.
461        //    - "HTTPError": generic error name used by BitMEX for various issues
462        //      Only retry if message contains "rate limit" to avoid retrying
463        //      non-transient errors like authentication failures, validation errors,
464        //      insufficient balance, etc. which also return as "HTTPError".
465        //
466        // Note: BitMEX returns many permanent errors as "HTTPError" (e.g., "Invalid orderQty",
467        // "Account has insufficient Available Balance", "Invalid API Key") which should NOT
468        // be retried. We only retry when the message explicitly mentions rate limiting.
469        //
470        // See tests in tests/http.rs for retry behavior validation.
471        let should_retry = |error: &BitmexHttpError| -> bool {
472            match error {
473                BitmexHttpError::NetworkError(_) => true,
474                BitmexHttpError::UnexpectedStatus { status, .. } => {
475                    status.as_u16() >= 500 || status.as_u16() == 429
476                }
477                BitmexHttpError::BitmexError {
478                    error_name,
479                    message,
480                } => {
481                    error_name == "RateLimitError"
482                        || (error_name == "HTTPError"
483                            && message.to_lowercase().contains("rate limit"))
484                }
485                _ => false,
486            }
487        };
488
489        let create_error = |msg: String| -> BitmexHttpError {
490            if msg == "canceled" {
491                BitmexHttpError::Canceled("Adapter disconnecting or shutting down".to_string())
492            } else {
493                BitmexHttpError::NetworkError(msg)
494            }
495        };
496
497        let cancel_token = self.cancellation_token();
498
499        self.retry_manager
500            .execute_with_retry_with_cancel(
501                endpoint.as_str(),
502                operation,
503                should_retry,
504                create_error,
505                &cancel_token,
506            )
507            .await
508    }
509
510    /// Get all instruments.
511    ///
512    /// # Errors
513    ///
514    /// Returns an error if the request fails, the response cannot be parsed, or the API returns an error.
515    pub async fn get_instruments(
516        &self,
517        active_only: bool,
518    ) -> Result<Vec<BitmexInstrument>, BitmexHttpError> {
519        let path = if active_only {
520            "/instrument/active"
521        } else {
522            "/instrument"
523        };
524        self.send_request::<_, ()>(Method::GET, path, None, None, false)
525            .await
526    }
527
528    /// Requests the current server time from BitMEX.
529    ///
530    /// Retrieves the BitMEX API info including the system time in Unix timestamp (milliseconds).
531    /// This is useful for synchronizing local clocks with the exchange server and logging time drift.
532    ///
533    /// # Errors
534    ///
535    /// Returns an error if the HTTP request fails or if the response body
536    /// cannot be parsed into [`BitmexApiInfo`].
537    pub async fn get_server_time(&self) -> Result<u64, BitmexHttpError> {
538        let response: BitmexApiInfo = self
539            .send_request::<_, ()>(Method::GET, "", None, None, false)
540            .await?;
541        Ok(response.timestamp)
542    }
543
544    /// Get the instrument definition for the specified symbol.
545    ///
546    /// BitMEX responds to `/instrument?symbol=...` with an array, even when
547    /// a single symbol is requested. This helper returns the first element of
548    /// that array and yields `Ok(None)` when the venue returns an empty list
549    /// (e.g. unknown symbol).
550    ///
551    /// # Errors
552    ///
553    /// Returns an error if the request fails or the payload cannot be deserialized.
554    pub async fn get_instrument(
555        &self,
556        symbol: &str,
557    ) -> Result<Option<BitmexInstrument>, BitmexHttpError> {
558        let path = &format!("/instrument?symbol={symbol}");
559        let instruments: Vec<BitmexInstrument> = self
560            .send_request::<_, ()>(Method::GET, path, None, None, false)
561            .await?;
562
563        Ok(instruments.into_iter().next())
564    }
565
566    /// Get user wallet information.
567    ///
568    /// # Errors
569    ///
570    /// Returns an error if credentials are missing, the request fails, or the API returns an error.
571    pub async fn get_wallet(&self) -> Result<BitmexWallet, BitmexHttpError> {
572        let endpoint = "/user/wallet";
573        self.send_request::<_, ()>(Method::GET, endpoint, None, None, true)
574            .await
575    }
576
577    /// Get user margin information for a specific currency.
578    ///
579    /// # Errors
580    ///
581    /// Returns an error if credentials are missing, the request fails, or the API returns an error.
582    pub async fn get_margin(&self, currency: &str) -> Result<BitmexMargin, BitmexHttpError> {
583        let path = format!("/user/margin?currency={currency}");
584        self.send_request::<_, ()>(Method::GET, &path, None, None, true)
585            .await
586    }
587
588    /// Get user margin information for all currencies.
589    ///
590    /// # Errors
591    ///
592    /// Returns an error if credentials are missing, the request fails, or the API returns an error.
593    pub async fn get_all_margins(&self) -> Result<Vec<BitmexMargin>, BitmexHttpError> {
594        self.send_request::<_, ()>(Method::GET, "/user/margin?currency=all", None, None, true)
595            .await
596    }
597
598    /// Get historical trades.
599    ///
600    /// # Errors
601    ///
602    /// Returns an error if credentials are missing, the request fails, or the API returns an error.
603    pub async fn get_trades(
604        &self,
605        params: GetTradeParams,
606    ) -> Result<Vec<BitmexTrade>, BitmexHttpError> {
607        self.send_request(Method::GET, "/trade", Some(&params), None, true)
608            .await
609    }
610
611    /// Get bucketed (aggregated) trade data.
612    ///
613    /// # Errors
614    ///
615    /// Returns an error if credentials are missing, the request fails, or the API returns an error.
616    pub async fn get_trade_bucketed(
617        &self,
618        params: GetTradeBucketedParams,
619    ) -> Result<Vec<BitmexTradeBin>, BitmexHttpError> {
620        self.send_request(Method::GET, "/trade/bucketed", Some(&params), None, true)
621            .await
622    }
623
624    /// Get user orders.
625    ///
626    /// # Errors
627    ///
628    /// Returns an error if credentials are missing, the request fails, or the API returns an error.
629    pub async fn get_orders(
630        &self,
631        params: GetOrderParams,
632    ) -> Result<Vec<BitmexOrder>, BitmexHttpError> {
633        self.send_request(Method::GET, "/order", Some(&params), None, true)
634            .await
635    }
636
637    /// Place a new order.
638    ///
639    /// # Errors
640    ///
641    /// Returns an error if credentials are missing, the request fails, order validation fails, or the API returns an error.
642    pub async fn place_order(&self, params: PostOrderParams) -> Result<Value, BitmexHttpError> {
643        // BitMEX spec requires form-encoded body for POST /order
644        let body = serde_urlencoded::to_string(&params)
645            .map_err(|e| {
646                BitmexHttpError::ValidationError(format!("Failed to serialize parameters: {e}"))
647            })?
648            .into_bytes();
649        let path = "/order";
650        self.send_request::<_, ()>(Method::POST, path, None, Some(body), true)
651            .await
652    }
653
654    /// Cancel user orders.
655    ///
656    /// # Errors
657    ///
658    /// Returns an error if credentials are missing, the request fails, the order doesn't exist, or the API returns an error.
659    pub async fn cancel_orders(&self, params: DeleteOrderParams) -> Result<Value, BitmexHttpError> {
660        // BitMEX spec requires form-encoded body for DELETE /order
661        let body = serde_urlencoded::to_string(&params)
662            .map_err(|e| {
663                BitmexHttpError::ValidationError(format!("Failed to serialize parameters: {e}"))
664            })?
665            .into_bytes();
666        let path = "/order";
667        self.send_request::<_, ()>(Method::DELETE, path, None, Some(body), true)
668            .await
669    }
670
671    /// Amend an existing order.
672    ///
673    /// # Errors
674    ///
675    /// Returns an error if credentials are missing, the request fails, the order doesn't exist, or the API returns an error.
676    pub async fn amend_order(&self, params: PutOrderParams) -> Result<Value, BitmexHttpError> {
677        // BitMEX spec requires form-encoded body for PUT /order
678        let body = serde_urlencoded::to_string(&params)
679            .map_err(|e| {
680                BitmexHttpError::ValidationError(format!("Failed to serialize parameters: {e}"))
681            })?
682            .into_bytes();
683        let path = "/order";
684        self.send_request::<_, ()>(Method::PUT, path, None, Some(body), true)
685            .await
686    }
687
688    /// Cancel all orders.
689    ///
690    /// # Errors
691    ///
692    /// Returns an error if credentials are missing, the request fails, or the API returns an error.
693    ///
694    /// # References
695    ///
696    /// <https://www.bitmex.com/api/explorer/#!/Order/Order_cancelAll>
697    pub async fn cancel_all_orders(
698        &self,
699        params: DeleteAllOrdersParams,
700    ) -> Result<Value, BitmexHttpError> {
701        self.send_request(Method::DELETE, "/order/all", Some(&params), None, true)
702            .await
703    }
704
705    /// Set a dead man's switch (cancel all orders after timeout).
706    ///
707    /// Calling with `timeout=0` disarms the switch.
708    ///
709    /// # Errors
710    ///
711    /// Returns an error if credentials are missing, the request fails, or the API returns an error.
712    ///
713    /// # References
714    ///
715    /// <https://www.bitmex.com/api/explorer/#!/Order/Order_cancelAllAfter>
716    pub async fn cancel_all_after(
717        &self,
718        params: PostCancelAllAfterParams,
719    ) -> Result<Value, BitmexHttpError> {
720        let body = serde_urlencoded::to_string(&params)
721            .map_err(|e| {
722                BitmexHttpError::ValidationError(format!("Failed to serialize parameters: {e}"))
723            })?
724            .into_bytes();
725        self.send_request::<_, ()>(
726            Method::POST,
727            "/order/cancelAllAfter",
728            None,
729            Some(body),
730            true,
731        )
732        .await
733    }
734
735    /// Get user executions.
736    ///
737    /// # Errors
738    ///
739    /// Returns an error if credentials are missing, the request fails, or the API returns an error.
740    pub async fn get_executions(
741        &self,
742        params: GetExecutionParams,
743    ) -> Result<Vec<BitmexExecution>, BitmexHttpError> {
744        let query = serde_urlencoded::to_string(&params).map_err(|e| {
745            BitmexHttpError::ValidationError(format!("Failed to serialize parameters: {e}"))
746        })?;
747        let path = format!("/execution/tradeHistory?{query}");
748        self.send_request::<_, ()>(Method::GET, &path, None, None, true)
749            .await
750    }
751
752    /// Get user positions.
753    ///
754    /// # Errors
755    ///
756    /// Returns an error if credentials are missing, the request fails, or the API returns an error.
757    pub async fn get_positions(
758        &self,
759        params: GetPositionParams,
760    ) -> Result<Vec<BitmexPosition>, BitmexHttpError> {
761        self.send_request(Method::GET, "/position", Some(&params), None, true)
762            .await
763    }
764
765    /// Update position leverage.
766    ///
767    /// # Errors
768    ///
769    /// Returns an error if credentials are missing, the request fails, or the API returns an error.
770    pub async fn update_position_leverage(
771        &self,
772        params: PostPositionLeverageParams,
773    ) -> Result<BitmexPosition, BitmexHttpError> {
774        // BitMEX spec requires form-encoded body for POST endpoints
775        let body = serde_urlencoded::to_string(&params)
776            .map_err(|e| {
777                BitmexHttpError::ValidationError(format!("Failed to serialize parameters: {e}"))
778            })?
779            .into_bytes();
780        let path = "/position/leverage";
781        self.send_request::<_, ()>(Method::POST, path, None, Some(body), true)
782            .await
783    }
784}
785
786/// Provides a HTTP client for connecting to the [BitMEX](https://bitmex.com) REST API.
787///
788/// This is the high-level client that wraps the inner client and provides
789/// Nautilus-specific functionality for trading operations.
790#[derive(Debug)]
791#[cfg_attr(
792    feature = "python",
793    pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.bitmex", from_py_object)
794)]
795pub struct BitmexHttpClient {
796    pub(crate) instruments_cache: Arc<DashMap<Ustr, InstrumentAny>>,
797    pub(crate) order_type_cache: Arc<DashMap<ClientOrderId, OrderType>>,
798    clock: &'static AtomicTime,
799    inner: Arc<BitmexRawHttpClient>,
800    cache_initialized: AtomicBool,
801}
802
803impl Clone for BitmexHttpClient {
804    fn clone(&self) -> Self {
805        let cache_initialized = AtomicBool::new(false);
806
807        let is_initialized = self.cache_initialized.load(Ordering::Acquire);
808        if is_initialized {
809            cache_initialized.store(true, Ordering::Release);
810        }
811
812        Self {
813            inner: self.inner.clone(),
814            instruments_cache: self.instruments_cache.clone(),
815            order_type_cache: self.order_type_cache.clone(),
816            cache_initialized,
817            clock: self.clock,
818        }
819    }
820}
821
822impl Default for BitmexHttpClient {
823    fn default() -> Self {
824        Self::new(
825            None,
826            None,
827            None,
828            false,
829            Some(60),
830            None,
831            None,
832            None,
833            None,
834            None,
835            None,
836            None, // proxy_url
837        )
838        .expect("Failed to create default BitmexHttpClient")
839    }
840}
841
842impl BitmexHttpClient {
843    /// Creates a new [`BitmexHttpClient`] instance.
844    ///
845    /// # Errors
846    ///
847    /// Returns an error if the HTTP client cannot be created.
848    #[allow(clippy::too_many_arguments)]
849    pub fn new(
850        base_url: Option<String>,
851        api_key: Option<String>,
852        api_secret: Option<String>,
853        testnet: bool,
854        timeout_secs: Option<u64>,
855        max_retries: Option<u32>,
856        retry_delay_ms: Option<u64>,
857        retry_delay_max_ms: Option<u64>,
858        recv_window_ms: Option<u64>,
859        max_requests_per_second: Option<u32>,
860        max_requests_per_minute: Option<u32>,
861        proxy_url: Option<String>,
862    ) -> Result<Self, BitmexHttpError> {
863        // Determine the base URL
864        let url = base_url.unwrap_or_else(|| {
865            if testnet {
866                BITMEX_HTTP_TESTNET_URL.to_string()
867            } else {
868                BITMEX_HTTP_URL.to_string()
869            }
870        });
871
872        let (key_var, secret_var) = credential_env_vars(testnet);
873        let api_key = get_or_env_var_opt(api_key, key_var);
874        let api_secret = get_or_env_var_opt(api_secret, secret_var);
875
876        let inner = match (api_key, api_secret) {
877            (Some(key), Some(secret)) => BitmexRawHttpClient::with_credentials(
878                key,
879                secret,
880                url,
881                timeout_secs,
882                max_retries,
883                retry_delay_ms,
884                retry_delay_max_ms,
885                recv_window_ms,
886                max_requests_per_second,
887                max_requests_per_minute,
888                proxy_url,
889            )?,
890            (Some(_), None) | (None, Some(_)) => {
891                return Err(BitmexHttpError::ValidationError(
892                    "Both api_key and api_secret must be provided, or neither".to_string(),
893                ));
894            }
895            (None, None) => BitmexRawHttpClient::new(
896                Some(url),
897                timeout_secs,
898                max_retries,
899                retry_delay_ms,
900                retry_delay_max_ms,
901                recv_window_ms,
902                max_requests_per_second,
903                max_requests_per_minute,
904                proxy_url,
905            )?,
906        };
907
908        Ok(Self {
909            inner: Arc::new(inner),
910            instruments_cache: Arc::new(DashMap::new()),
911            order_type_cache: Arc::new(DashMap::new()),
912            cache_initialized: AtomicBool::new(false),
913            clock: get_atomic_clock_realtime(),
914        })
915    }
916
917    /// Creates a new [`BitmexHttpClient`] instance using environment variables and
918    /// the default BitMEX HTTP base URL.
919    ///
920    /// # Errors
921    ///
922    /// Returns an error if required environment variables are not set or invalid.
923    pub fn from_env() -> anyhow::Result<Self> {
924        Self::with_credentials(
925            None, None, None, None, None, None, None, None, None, None, None,
926        )
927        .map_err(|e| anyhow::anyhow!("Failed to create HTTP client: {e}"))
928    }
929
930    /// Creates a new [`BitmexHttpClient`] configured with credentials
931    /// for authenticated requests.
932    ///
933    /// If `api_key` or `api_secret` are `None`, they will be sourced from the
934    /// `BITMEX_API_KEY` and `BITMEX_API_SECRET` environment variables.
935    ///
936    /// # Errors
937    ///
938    /// Returns an error if one credential is provided without the other.
939    #[allow(clippy::too_many_arguments)]
940    pub fn with_credentials(
941        api_key: Option<String>,
942        api_secret: Option<String>,
943        base_url: Option<String>,
944        timeout_secs: Option<u64>,
945        max_retries: Option<u32>,
946        retry_delay_ms: Option<u64>,
947        retry_delay_max_ms: Option<u64>,
948        recv_window_ms: Option<u64>,
949        max_requests_per_second: Option<u32>,
950        max_requests_per_minute: Option<u32>,
951        proxy_url: Option<String>,
952    ) -> anyhow::Result<Self> {
953        // Determine testnet from URL first to select correct environment variables
954        let testnet = base_url.as_ref().is_some_and(|url| url.contains("testnet"));
955
956        let (key_var, secret_var) = credential_env_vars(testnet);
957
958        let api_key = get_or_env_var_opt(api_key, key_var);
959        let api_secret = get_or_env_var_opt(api_secret, secret_var);
960
961        // If we're trying to create an authenticated client, we need both key and secret
962        if api_key.is_some() && api_secret.is_none() {
963            anyhow::bail!("{secret_var} is required when {key_var} is provided");
964        }
965
966        if api_key.is_none() && api_secret.is_some() {
967            anyhow::bail!("{key_var} is required when {secret_var} is provided");
968        }
969
970        Self::new(
971            base_url,
972            api_key,
973            api_secret,
974            testnet,
975            timeout_secs,
976            max_retries,
977            retry_delay_ms,
978            retry_delay_max_ms,
979            recv_window_ms,
980            max_requests_per_second,
981            max_requests_per_minute,
982            proxy_url,
983        )
984        .map_err(|e| anyhow::anyhow!("Failed to create HTTP client: {e}"))
985    }
986
987    /// Returns the base url being used by the client.
988    #[must_use]
989    pub fn base_url(&self) -> &str {
990        self.inner.base_url.as_str()
991    }
992
993    /// Returns the public API key being used by the client.
994    #[must_use]
995    pub fn api_key(&self) -> Option<&str> {
996        self.inner.credential.as_ref().map(|c| c.api_key())
997    }
998
999    /// Returns a masked version of the API key for logging purposes.
1000    #[must_use]
1001    pub fn api_key_masked(&self) -> Option<String> {
1002        self.inner.credential.as_ref().map(|c| c.api_key_masked())
1003    }
1004
1005    /// Requests the current server time from BitMEX.
1006    ///
1007    /// Returns the BitMEX system time as a Unix timestamp in milliseconds.
1008    ///
1009    /// # Errors
1010    ///
1011    /// Returns an error if the HTTP request fails or if the response cannot be parsed.
1012    pub async fn get_server_time(&self) -> Result<u64, BitmexHttpError> {
1013        self.inner.get_server_time().await
1014    }
1015
1016    /// Sets the dead man's switch (cancel all orders after timeout).
1017    ///
1018    /// Calling with `timeout_ms=0` disarms the switch.
1019    ///
1020    /// # Errors
1021    ///
1022    /// Returns an error if the HTTP request fails.
1023    pub async fn cancel_all_after(&self, timeout_ms: u64) -> anyhow::Result<()> {
1024        let params = PostCancelAllAfterParams {
1025            timeout: timeout_ms,
1026        };
1027        self.inner.cancel_all_after(params).await?;
1028        Ok(())
1029    }
1030
1031    /// Generates a timestamp for initialization.
1032    fn generate_ts_init(&self) -> UnixNanos {
1033        self.clock.get_time_ns()
1034    }
1035
1036    /// Check if the order has a contingency type that requires linking.
1037    fn is_contingent_order(contingency_type: ContingencyType) -> bool {
1038        matches!(
1039            contingency_type,
1040            ContingencyType::Oco | ContingencyType::Oto | ContingencyType::Ouo
1041        )
1042    }
1043
1044    /// Check if the order is a parent in contingency relationships.
1045    fn is_parent_contingency(contingency_type: ContingencyType) -> bool {
1046        matches!(
1047            contingency_type,
1048            ContingencyType::Oco | ContingencyType::Oto
1049        )
1050    }
1051
1052    /// Populate missing `linked_order_ids` for contingency orders by grouping on `order_list_id`.
1053    fn populate_linked_order_ids(reports: &mut [OrderStatusReport]) {
1054        let mut order_list_groups: HashMap<OrderListId, Vec<ClientOrderId>> = HashMap::new();
1055        let mut order_list_parents: HashMap<OrderListId, ClientOrderId> = HashMap::new();
1056        let mut prefix_groups: HashMap<String, Vec<ClientOrderId>> = HashMap::new();
1057        let mut prefix_parents: HashMap<String, ClientOrderId> = HashMap::new();
1058
1059        for report in reports.iter() {
1060            let Some(client_order_id) = report.client_order_id else {
1061                continue;
1062            };
1063
1064            if let Some(order_list_id) = report.order_list_id {
1065                order_list_groups
1066                    .entry(order_list_id)
1067                    .or_default()
1068                    .push(client_order_id);
1069
1070                if Self::is_parent_contingency(report.contingency_type) {
1071                    order_list_parents
1072                        .entry(order_list_id)
1073                        .or_insert(client_order_id);
1074                }
1075            }
1076
1077            if let Some((base, _)) = client_order_id.as_str().rsplit_once('-')
1078                && Self::is_contingent_order(report.contingency_type)
1079            {
1080                prefix_groups
1081                    .entry(base.to_owned())
1082                    .or_default()
1083                    .push(client_order_id);
1084
1085                if Self::is_parent_contingency(report.contingency_type) {
1086                    prefix_parents
1087                        .entry(base.to_owned())
1088                        .or_insert(client_order_id);
1089                }
1090            }
1091        }
1092
1093        for report in reports.iter_mut() {
1094            let Some(client_order_id) = report.client_order_id else {
1095                continue;
1096            };
1097
1098            if report.linked_order_ids.is_some() {
1099                continue;
1100            }
1101
1102            // Only process contingent orders
1103            if !Self::is_contingent_order(report.contingency_type) {
1104                continue;
1105            }
1106
1107            if let Some(order_list_id) = report.order_list_id
1108                && let Some(group) = order_list_groups.get(&order_list_id)
1109            {
1110                let mut linked: Vec<ClientOrderId> = group
1111                    .iter()
1112                    .copied()
1113                    .filter(|candidate| candidate != &client_order_id)
1114                    .collect();
1115
1116                if !linked.is_empty() {
1117                    if let Some(parent_id) = order_list_parents.get(&order_list_id) {
1118                        if client_order_id == *parent_id {
1119                            report.parent_order_id = None;
1120                        } else {
1121                            linked.sort_by_key(|candidate| i32::from(candidate != parent_id));
1122                            report.parent_order_id = Some(*parent_id);
1123                        }
1124                    } else {
1125                        report.parent_order_id = None;
1126                    }
1127
1128                    log::trace!(
1129                        "BitMEX linked ids sourced from order list id: client_order_id={:?}, order_list_id={:?}, contingency_type={:?}, linked_order_ids={:?}",
1130                        client_order_id,
1131                        order_list_id,
1132                        report.contingency_type,
1133                        linked,
1134                    );
1135                    report.linked_order_ids = Some(linked);
1136                    continue;
1137                }
1138
1139                log::trace!(
1140                    "BitMEX order list id group had no peers: client_order_id={:?}, order_list_id={:?}, contingency_type={:?}, order_list_group={:?}",
1141                    client_order_id,
1142                    order_list_id,
1143                    report.contingency_type,
1144                    group,
1145                );
1146                report.parent_order_id = None;
1147            } else if report.order_list_id.is_none() {
1148                report.parent_order_id = None;
1149            }
1150
1151            if let Some((base, _)) = client_order_id.as_str().rsplit_once('-')
1152                && let Some(group) = prefix_groups.get(base)
1153            {
1154                let mut linked: Vec<ClientOrderId> = group
1155                    .iter()
1156                    .copied()
1157                    .filter(|candidate| candidate != &client_order_id)
1158                    .collect();
1159
1160                if !linked.is_empty() {
1161                    if let Some(parent_id) = prefix_parents.get(base) {
1162                        if client_order_id == *parent_id {
1163                            report.parent_order_id = None;
1164                        } else {
1165                            linked.sort_by_key(|candidate| i32::from(candidate != parent_id));
1166                            report.parent_order_id = Some(*parent_id);
1167                        }
1168                    } else {
1169                        report.parent_order_id = None;
1170                    }
1171
1172                    log::trace!(
1173                        "BitMEX linked ids constructed from client order id prefix: client_order_id={:?}, contingency_type={:?}, base={}, linked_order_ids={:?}",
1174                        client_order_id,
1175                        report.contingency_type,
1176                        base,
1177                        linked,
1178                    );
1179                    report.linked_order_ids = Some(linked);
1180                    continue;
1181                }
1182
1183                log::trace!(
1184                    "BitMEX client order id prefix group had no peers: client_order_id={:?}, contingency_type={:?}, base={}, prefix_group={:?}",
1185                    client_order_id,
1186                    report.contingency_type,
1187                    base,
1188                    group,
1189                );
1190                report.parent_order_id = None;
1191            } else if client_order_id.as_str().contains('-') {
1192                report.parent_order_id = None;
1193            }
1194
1195            if Self::is_contingent_order(report.contingency_type) {
1196                log::warn!(
1197                    "BitMEX order status report missing linked ids after grouping: client_order_id={:?}, order_list_id={:?}, contingency_type={:?}",
1198                    report.client_order_id,
1199                    report.order_list_id,
1200                    report.contingency_type,
1201                );
1202                report.contingency_type = ContingencyType::NoContingency;
1203                report.parent_order_id = None;
1204            }
1205
1206            report.linked_order_ids = None;
1207        }
1208    }
1209
1210    /// Cancel all pending HTTP requests.
1211    pub fn cancel_all_requests(&self) {
1212        self.inner.cancel_all_requests();
1213    }
1214
1215    /// Replace the cancellation token so new requests can proceed.
1216    pub fn reset_cancellation_token(&self) {
1217        self.inner.reset_cancellation_token();
1218    }
1219
1220    /// Get a clone of the cancellation token for this client.
1221    pub fn cancellation_token(&self) -> CancellationToken {
1222        self.inner.cancellation_token()
1223    }
1224
1225    /// Caches a single instrument.
1226    ///
1227    /// Any existing instrument with the same symbol will be replaced.
1228    pub fn cache_instrument(&self, instrument: InstrumentAny) {
1229        self.instruments_cache
1230            .insert(instrument.raw_symbol().inner(), instrument);
1231        self.cache_initialized.store(true, Ordering::Release);
1232    }
1233
1234    /// Gets an instrument from the cache by symbol.
1235    pub fn get_instrument(&self, symbol: &Ustr) -> Option<InstrumentAny> {
1236        self.instruments_cache
1237            .get(symbol)
1238            .map(|entry| entry.value().clone())
1239    }
1240
1241    /// Request a single instrument and parse it into a Nautilus type.
1242    ///
1243    /// # Errors
1244    ///
1245    /// Returns `Ok(Some(..))` when the venue returns a definition that parses
1246    /// successfully, `Ok(None)` when the instrument is unknown, unsupported, or the payload
1247    /// cannot be converted into a Nautilus `Instrument`.
1248    pub async fn request_instrument(
1249        &self,
1250        instrument_id: InstrumentId,
1251    ) -> anyhow::Result<Option<InstrumentAny>> {
1252        let response = self
1253            .inner
1254            .get_instrument(instrument_id.symbol.as_str())
1255            .await?;
1256
1257        let instrument = match response {
1258            Some(instrument) => instrument,
1259            None => return Ok(None),
1260        };
1261
1262        let ts_init = self.generate_ts_init();
1263
1264        match parse_instrument_any(&instrument, ts_init) {
1265            InstrumentParseResult::Ok(inst) => Ok(Some(*inst)),
1266            InstrumentParseResult::Unsupported {
1267                symbol,
1268                instrument_type,
1269            } => {
1270                log::debug!(
1271                    "Instrument {symbol} has unsupported type {instrument_type:?}, returning None"
1272                );
1273                Ok(None)
1274            }
1275            InstrumentParseResult::Inactive { symbol, state } => {
1276                log::debug!("Instrument {symbol} is inactive (state={state}), returning None");
1277                Ok(None)
1278            }
1279            InstrumentParseResult::Failed {
1280                symbol,
1281                instrument_type,
1282                error,
1283            } => {
1284                log::error!(
1285                    "Failed to parse instrument {symbol} (type={instrument_type:?}): {error}"
1286                );
1287                Ok(None)
1288            }
1289        }
1290    }
1291
1292    /// Request all available instruments and parse them into Nautilus types.
1293    ///
1294    /// # Errors
1295    ///
1296    /// Returns an error if the HTTP request fails or parsing fails.
1297    pub async fn request_instruments(
1298        &self,
1299        active_only: bool,
1300    ) -> anyhow::Result<Vec<InstrumentAny>> {
1301        let instruments = self.inner.get_instruments(active_only).await?;
1302        let ts_init = self.generate_ts_init();
1303
1304        let mut parsed_instruments = Vec::new();
1305        let mut skipped_count = 0;
1306        let mut inactive_count = 0;
1307        let mut failed_count = 0;
1308        let total_count = instruments.len();
1309
1310        for inst in instruments {
1311            match parse_instrument_any(&inst, ts_init) {
1312                InstrumentParseResult::Ok(instrument_any) => {
1313                    parsed_instruments.push(*instrument_any);
1314                }
1315                InstrumentParseResult::Unsupported {
1316                    symbol,
1317                    instrument_type,
1318                } => {
1319                    skipped_count += 1;
1320                    log::debug!(
1321                        "Skipping unsupported instrument type: symbol={symbol}, type={instrument_type:?}"
1322                    );
1323                }
1324                InstrumentParseResult::Inactive { symbol, state } => {
1325                    inactive_count += 1;
1326                    log::debug!("Skipping inactive instrument: symbol={symbol}, state={state}");
1327                }
1328                InstrumentParseResult::Failed {
1329                    symbol,
1330                    instrument_type,
1331                    error,
1332                } => {
1333                    failed_count += 1;
1334                    log::error!(
1335                        "Failed to parse instrument: symbol={symbol}, type={instrument_type:?}, error={error}"
1336                    );
1337                }
1338            }
1339        }
1340
1341        if skipped_count > 0 {
1342            log::info!(
1343                "Skipped {skipped_count} unsupported instrument type(s) out of {total_count} total"
1344            );
1345        }
1346
1347        if inactive_count > 0 {
1348            log::info!(
1349                "Skipped {inactive_count} inactive instrument(s) out of {total_count} total"
1350            );
1351        }
1352
1353        if failed_count > 0 {
1354            log::error!(
1355                "Instrument parse failures: {failed_count} failed out of {total_count} total ({} successfully parsed)",
1356                parsed_instruments.len()
1357            );
1358        }
1359
1360        Ok(parsed_instruments)
1361    }
1362
1363    /// Get user wallet information.
1364    ///
1365    /// # Errors
1366    ///
1367    /// Returns an error if credentials are missing, the request fails, or the API returns an error.
1368    pub async fn get_wallet(&self) -> Result<BitmexWallet, BitmexHttpError> {
1369        let inner = self.inner.clone();
1370        inner.get_wallet().await
1371    }
1372
1373    /// Get user orders.
1374    ///
1375    /// # Errors
1376    ///
1377    /// Returns an error if credentials are missing, the request fails, or the API returns an error.
1378    pub async fn get_orders(
1379        &self,
1380        params: GetOrderParams,
1381    ) -> Result<Vec<BitmexOrder>, BitmexHttpError> {
1382        let inner = self.inner.clone();
1383        inner.get_orders(params).await
1384    }
1385
1386    /// Get instrument from the instruments cache (if found).
1387    ///
1388    /// # Errors
1389    ///
1390    /// Returns an error if the instrument is not found in the cache.
1391    fn instrument_from_cache(&self, symbol: Ustr) -> anyhow::Result<InstrumentAny> {
1392        self.get_instrument(&symbol).ok_or_else(|| {
1393            anyhow::anyhow!(
1394                "Instrument {symbol} not found in cache, ensure instruments loaded first"
1395            )
1396        })
1397    }
1398
1399    /// Returns the cached price precision for the given symbol.
1400    ///
1401    /// # Errors
1402    ///
1403    /// Returns an error if the instrument was never cached (for example, if
1404    /// instruments were not loaded prior to use).
1405    pub fn get_price_precision(&self, symbol: Ustr) -> anyhow::Result<u8> {
1406        self.instrument_from_cache(symbol)
1407            .map(|instrument| instrument.price_precision())
1408    }
1409
1410    /// Get user margin information for a specific currency.
1411    ///
1412    /// # Errors
1413    ///
1414    /// Returns an error if credentials are missing, the request fails, or the API returns an error.
1415    pub async fn get_margin(&self, currency: &str) -> anyhow::Result<BitmexMargin> {
1416        self.inner
1417            .get_margin(currency)
1418            .await
1419            .map_err(|e| anyhow::anyhow!(e))
1420    }
1421
1422    /// Get user margin information for all currencies.
1423    ///
1424    /// # Errors
1425    ///
1426    /// Returns an error if credentials are missing, the request fails, or the API returns an error.
1427    pub async fn get_all_margins(&self) -> anyhow::Result<Vec<BitmexMargin>> {
1428        self.inner
1429            .get_all_margins()
1430            .await
1431            .map_err(|e| anyhow::anyhow!(e))
1432    }
1433
1434    /// Request account state for the given account.
1435    ///
1436    /// # Errors
1437    ///
1438    /// Returns an error if the HTTP request fails or no account state is returned.
1439    pub async fn request_account_state(
1440        &self,
1441        account_id: AccountId,
1442    ) -> anyhow::Result<AccountState> {
1443        let margins = self
1444            .inner
1445            .get_all_margins()
1446            .await
1447            .map_err(|e| anyhow::anyhow!(e))?;
1448
1449        let ts_init =
1450            UnixNanos::from(chrono::Utc::now().timestamp_nanos_opt().unwrap_or_default() as u64);
1451
1452        let mut balances = Vec::with_capacity(margins.len());
1453        let mut latest_timestamp: Option<chrono::DateTime<chrono::Utc>> = None;
1454
1455        for margin in margins {
1456            if let Some(ts) = margin.timestamp {
1457                latest_timestamp = Some(latest_timestamp.map_or(ts, |prev| prev.max(ts)));
1458            }
1459
1460            let margin_msg = BitmexMarginMsg {
1461                account: margin.account,
1462                currency: margin.currency,
1463                risk_limit: margin.risk_limit,
1464                amount: margin.amount,
1465                prev_realised_pnl: margin.prev_realised_pnl,
1466                gross_comm: margin.gross_comm,
1467                gross_open_cost: margin.gross_open_cost,
1468                gross_open_premium: margin.gross_open_premium,
1469                gross_exec_cost: margin.gross_exec_cost,
1470                gross_mark_value: margin.gross_mark_value,
1471                risk_value: margin.risk_value,
1472                init_margin: margin.init_margin,
1473                maint_margin: margin.maint_margin,
1474                target_excess_margin: margin.target_excess_margin,
1475                realised_pnl: margin.realised_pnl,
1476                unrealised_pnl: margin.unrealised_pnl,
1477                wallet_balance: margin.wallet_balance,
1478                margin_balance: margin.margin_balance,
1479                margin_leverage: margin.margin_leverage,
1480                margin_used_pcnt: margin.margin_used_pcnt,
1481                excess_margin: margin.excess_margin,
1482                available_margin: margin.available_margin,
1483                withdrawable_margin: margin.withdrawable_margin,
1484                maker_fee_discount: None,
1485                taker_fee_discount: None,
1486                timestamp: margin.timestamp.unwrap_or_else(chrono::Utc::now),
1487                foreign_margin_balance: None,
1488                foreign_requirement: None,
1489            };
1490
1491            balances.push(parse_account_balance(&margin_msg));
1492        }
1493
1494        if balances.is_empty() {
1495            anyhow::bail!("No margin data returned from BitMEX");
1496        }
1497
1498        let account_type = AccountType::Margin;
1499        let margins_vec = Vec::new();
1500        let is_reported = true;
1501        let event_id = UUID4::new();
1502
1503        // Use server timestamp if available, otherwise fall back to local time
1504        let ts_event = latest_timestamp.map_or(ts_init, |ts| {
1505            UnixNanos::from(ts.timestamp_nanos_opt().unwrap_or_default() as u64)
1506        });
1507
1508        Ok(AccountState::new(
1509            account_id,
1510            account_type,
1511            balances,
1512            margins_vec,
1513            is_reported,
1514            event_id,
1515            ts_event,
1516            ts_init,
1517            None,
1518        ))
1519    }
1520
1521    /// Submit a new order.
1522    ///
1523    /// # Errors
1524    ///
1525    /// Returns an error if credentials are missing, the request fails, order validation fails,
1526    /// the order is rejected, or the API returns an error.
1527    #[allow(clippy::too_many_arguments)]
1528    pub async fn submit_order(
1529        &self,
1530        instrument_id: InstrumentId,
1531        client_order_id: ClientOrderId,
1532        order_side: OrderSide,
1533        order_type: OrderType,
1534        quantity: Quantity,
1535        time_in_force: TimeInForce,
1536        price: Option<Price>,
1537        trigger_price: Option<Price>,
1538        trigger_type: Option<TriggerType>,
1539        trailing_offset: Option<f64>,
1540        trailing_offset_type: Option<TrailingOffsetType>,
1541        display_qty: Option<Quantity>,
1542        post_only: bool,
1543        reduce_only: bool,
1544        order_list_id: Option<OrderListId>,
1545        contingency_type: Option<ContingencyType>,
1546        peg_price_type: Option<BitmexPegPriceType>,
1547        peg_offset_value: Option<f64>,
1548    ) -> anyhow::Result<OrderStatusReport> {
1549        let instrument = self.instrument_from_cache(instrument_id.symbol.inner())?;
1550
1551        let mut params = super::query::PostOrderParamsBuilder::default();
1552        params.text(NAUTILUS_TRADER);
1553        params.symbol(instrument_id.symbol.as_str());
1554        params.cl_ord_id(client_order_id.as_str());
1555
1556        if order_side == OrderSide::NoOrderSide {
1557            anyhow::bail!("Order side must be Buy or Sell");
1558        }
1559        let side = BitmexSide::from(order_side.as_specified());
1560        params.side(side);
1561
1562        let ord_type = BitmexOrderType::try_from_order_type(order_type)?;
1563        params.ord_type(ord_type);
1564
1565        params.order_qty(quantity_to_u32(&quantity, &instrument));
1566
1567        let tif = BitmexTimeInForce::try_from_time_in_force(time_in_force)?;
1568        params.time_in_force(tif);
1569
1570        if let Some(price) = price {
1571            params.price(price.as_f64());
1572        }
1573
1574        if let Some(trigger_price) = trigger_price {
1575            params.stop_px(trigger_price.as_f64());
1576        }
1577
1578        if let Some(display_qty) = display_qty {
1579            params.display_qty(quantity_to_u32(&display_qty, &instrument));
1580        }
1581
1582        if let Some(order_list_id) = order_list_id {
1583            params.cl_ord_link_id(order_list_id.as_str());
1584        }
1585
1586        let is_trailing_stop = matches!(
1587            order_type,
1588            OrderType::TrailingStopMarket | OrderType::TrailingStopLimit
1589        );
1590
1591        if is_trailing_stop && let Some(offset) = trailing_offset {
1592            if let Some(offset_type) = trailing_offset_type
1593                && offset_type != TrailingOffsetType::Price
1594            {
1595                anyhow::bail!(
1596                    "BitMEX only supports PRICE trailing offset type, was {offset_type:?}"
1597                );
1598            }
1599
1600            params.peg_price_type(BitmexPegPriceType::TrailingStopPeg);
1601
1602            // BitMEX requires negative offset for stop-sell orders
1603            let signed_offset = match order_side {
1604                OrderSide::Sell => -offset.abs(),
1605                OrderSide::Buy => offset.abs(),
1606                _ => offset,
1607            };
1608            params.peg_offset_value(signed_offset);
1609        }
1610
1611        // Pegged orders (BBO) via params override
1612        if peg_price_type.is_none() && peg_offset_value.is_some() {
1613            anyhow::bail!("`peg_offset_value` requires `peg_price_type`");
1614        }
1615
1616        if let Some(peg_type) = peg_price_type {
1617            if order_type != OrderType::Limit {
1618                anyhow::bail!(
1619                    "Pegged orders only supported for LIMIT order type, was {order_type:?}"
1620                );
1621            }
1622            params.ord_type(BitmexOrderType::Pegged);
1623            params.peg_price_type(peg_type);
1624
1625            if let Some(offset) = peg_offset_value {
1626                params.peg_offset_value(offset);
1627            }
1628        }
1629
1630        let mut exec_inst = Vec::new();
1631
1632        if post_only {
1633            exec_inst.push(BitmexExecInstruction::ParticipateDoNotInitiate);
1634        }
1635
1636        if reduce_only {
1637            exec_inst.push(BitmexExecInstruction::ReduceOnly);
1638        }
1639
1640        // For trailing stops, trigger_type specifies which price to track (Mark, Last, Index)
1641        if (trigger_price.is_some() || is_trailing_stop)
1642            && let Some(trigger_type) = trigger_type
1643        {
1644            match trigger_type {
1645                TriggerType::LastPrice => exec_inst.push(BitmexExecInstruction::LastPrice),
1646                TriggerType::MarkPrice => exec_inst.push(BitmexExecInstruction::MarkPrice),
1647                TriggerType::IndexPrice => exec_inst.push(BitmexExecInstruction::IndexPrice),
1648                _ => {} // Use BitMEX default (LastPrice) for other trigger types
1649            }
1650        }
1651
1652        if !exec_inst.is_empty() {
1653            params.exec_inst(exec_inst);
1654        }
1655
1656        if let Some(contingency_type) = contingency_type {
1657            let bitmex_contingency = BitmexContingencyType::try_from(contingency_type)?;
1658            params.contingency_type(bitmex_contingency);
1659        }
1660
1661        let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1662
1663        let response = self.inner.place_order(params).await?;
1664
1665        let order: BitmexOrder = serde_json::from_value(response)?;
1666
1667        if order.ord_status == Some(BitmexOrderStatus::Rejected) {
1668            let reason = order
1669                .ord_rej_reason
1670                .map_or_else(|| "No reason provided".to_string(), |r| r.to_string());
1671            anyhow::bail!("Order rejected: {reason}");
1672        }
1673
1674        // Cache order type for future lookups (e.g., cancel responses missing ord_type)
1675        self.order_type_cache.insert(client_order_id, order_type);
1676
1677        let instrument = self.instrument_from_cache(instrument_id.symbol.inner())?;
1678        let ts_init = self.generate_ts_init();
1679
1680        parse_order_status_report(&order, &instrument, &self.order_type_cache, ts_init)
1681    }
1682
1683    /// Cancel an order.
1684    ///
1685    /// # Errors
1686    ///
1687    /// Returns an error if:
1688    /// - Credentials are missing.
1689    /// - The request fails.
1690    /// - The order doesn't exist.
1691    /// - The API returns an error.
1692    pub async fn cancel_order(
1693        &self,
1694        instrument_id: InstrumentId,
1695        client_order_id: Option<ClientOrderId>,
1696        venue_order_id: Option<VenueOrderId>,
1697    ) -> anyhow::Result<OrderStatusReport> {
1698        let mut params = super::query::DeleteOrderParamsBuilder::default();
1699        params.text(NAUTILUS_TRADER);
1700
1701        if let Some(venue_order_id) = venue_order_id {
1702            params.order_id(vec![venue_order_id.as_str().to_string()]);
1703        } else if let Some(client_order_id) = client_order_id {
1704            params.cl_ord_id(vec![client_order_id.as_str().to_string()]);
1705        } else {
1706            anyhow::bail!("Either client_order_id or venue_order_id must be provided");
1707        }
1708
1709        let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1710
1711        let response = self.inner.cancel_orders(params).await?;
1712
1713        let orders: Vec<BitmexOrder> = serde_json::from_value(response)?;
1714        let order = orders
1715            .into_iter()
1716            .next()
1717            .ok_or_else(|| anyhow::anyhow!("No order returned in cancel response"))?;
1718
1719        let instrument = self.instrument_from_cache(instrument_id.symbol.inner())?;
1720        let ts_init = self.generate_ts_init();
1721
1722        parse_order_status_report(&order, &instrument, &self.order_type_cache, ts_init)
1723    }
1724
1725    /// Cancel multiple orders.
1726    ///
1727    /// # Errors
1728    ///
1729    /// Returns an error if:
1730    /// - Credentials are missing.
1731    /// - The request fails.
1732    /// - The order doesn't exist.
1733    /// - The API returns an error.
1734    pub async fn cancel_orders(
1735        &self,
1736        instrument_id: InstrumentId,
1737        client_order_ids: Option<Vec<ClientOrderId>>,
1738        venue_order_ids: Option<Vec<VenueOrderId>>,
1739    ) -> anyhow::Result<Vec<OrderStatusReport>> {
1740        let mut params = super::query::DeleteOrderParamsBuilder::default();
1741        params.text(NAUTILUS_TRADER);
1742
1743        // BitMEX API requires either client order IDs or venue order IDs, not both
1744        // Prioritize venue order IDs if both are provided
1745        if let Some(venue_order_ids) = venue_order_ids {
1746            if venue_order_ids.is_empty() {
1747                anyhow::bail!("venue_order_ids cannot be empty");
1748            }
1749            params.order_id(
1750                venue_order_ids
1751                    .iter()
1752                    .map(|id| id.to_string())
1753                    .collect::<Vec<_>>(),
1754            );
1755        } else if let Some(client_order_ids) = client_order_ids {
1756            if client_order_ids.is_empty() {
1757                anyhow::bail!("client_order_ids cannot be empty");
1758            }
1759            params.cl_ord_id(
1760                client_order_ids
1761                    .iter()
1762                    .map(|id| id.to_string())
1763                    .collect::<Vec<_>>(),
1764            );
1765        } else {
1766            anyhow::bail!("Either client_order_ids or venue_order_ids must be provided");
1767        }
1768
1769        let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1770
1771        let response = self.inner.cancel_orders(params).await?;
1772
1773        let orders: Vec<BitmexOrder> = serde_json::from_value(response)?;
1774
1775        let ts_init = self.generate_ts_init();
1776        let instrument = self.instrument_from_cache(instrument_id.symbol.inner())?;
1777
1778        let mut reports = Vec::new();
1779
1780        for order in orders {
1781            reports.push(parse_order_status_report(
1782                &order,
1783                &instrument,
1784                &self.order_type_cache,
1785                ts_init,
1786            )?);
1787        }
1788
1789        Self::populate_linked_order_ids(&mut reports);
1790
1791        Ok(reports)
1792    }
1793
1794    /// Cancel all orders for an instrument and optionally an order side.
1795    ///
1796    /// # Errors
1797    ///
1798    /// Returns an error if:
1799    /// - Credentials are missing.
1800    /// - The request fails.
1801    /// - The order doesn't exist.
1802    /// - The API returns an error.
1803    pub async fn cancel_all_orders(
1804        &self,
1805        instrument_id: InstrumentId,
1806        order_side: Option<OrderSide>,
1807    ) -> anyhow::Result<Vec<OrderStatusReport>> {
1808        let mut params = DeleteAllOrdersParamsBuilder::default();
1809        params.text(NAUTILUS_TRADER);
1810        params.symbol(instrument_id.symbol.as_str());
1811
1812        if let Some(side) = order_side {
1813            if side == OrderSide::NoOrderSide {
1814                log::debug!("Ignoring NoOrderSide filter for cancel_all_orders on {instrument_id}",);
1815            } else {
1816                let side = BitmexSide::from(side.as_specified());
1817                params.filter(serde_json::json!({
1818                    "side": side
1819                }));
1820            }
1821        }
1822
1823        let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1824
1825        let response = self.inner.cancel_all_orders(params).await?;
1826
1827        let orders: Vec<BitmexOrder> = serde_json::from_value(response)?;
1828
1829        let instrument = self.instrument_from_cache(instrument_id.symbol.inner())?;
1830        let ts_init = self.generate_ts_init();
1831
1832        let mut reports = Vec::new();
1833
1834        for order in orders {
1835            reports.push(parse_order_status_report(
1836                &order,
1837                &instrument,
1838                &self.order_type_cache,
1839                ts_init,
1840            )?);
1841        }
1842
1843        Self::populate_linked_order_ids(&mut reports);
1844
1845        Ok(reports)
1846    }
1847
1848    /// Modify an existing order.
1849    ///
1850    /// # Errors
1851    ///
1852    /// Returns an error if:
1853    /// - Credentials are missing.
1854    /// - The request fails.
1855    /// - The order doesn't exist.
1856    /// - The order is already closed.
1857    /// - The API returns an error.
1858    pub async fn modify_order(
1859        &self,
1860        instrument_id: InstrumentId,
1861        client_order_id: Option<ClientOrderId>,
1862        venue_order_id: Option<VenueOrderId>,
1863        quantity: Option<Quantity>,
1864        price: Option<Price>,
1865        trigger_price: Option<Price>,
1866    ) -> anyhow::Result<OrderStatusReport> {
1867        let mut params = PutOrderParamsBuilder::default();
1868        params.text(NAUTILUS_TRADER);
1869
1870        // Set order ID - prefer venue_order_id if available
1871        if let Some(venue_order_id) = venue_order_id {
1872            params.order_id(venue_order_id.as_str());
1873        } else if let Some(client_order_id) = client_order_id {
1874            params.orig_cl_ord_id(client_order_id.as_str());
1875        } else {
1876            anyhow::bail!("Either client_order_id or venue_order_id must be provided");
1877        }
1878
1879        if let Some(quantity) = quantity {
1880            let instrument = self.instrument_from_cache(instrument_id.symbol.inner())?;
1881            params.order_qty(quantity_to_u32(&quantity, &instrument));
1882        }
1883
1884        if let Some(price) = price {
1885            params.price(price.as_f64());
1886        }
1887
1888        if let Some(trigger_price) = trigger_price {
1889            params.stop_px(trigger_price.as_f64());
1890        }
1891
1892        let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1893
1894        let response = self.inner.amend_order(params).await?;
1895
1896        let order: BitmexOrder = serde_json::from_value(response)?;
1897
1898        if order.ord_status == Some(BitmexOrderStatus::Rejected) {
1899            let reason = order
1900                .ord_rej_reason
1901                .map_or_else(|| "No reason provided".to_string(), |r| r.to_string());
1902            anyhow::bail!("Order modification rejected: {reason}");
1903        }
1904
1905        let instrument = self.instrument_from_cache(instrument_id.symbol.inner())?;
1906        let ts_init = self.generate_ts_init();
1907
1908        parse_order_status_report(&order, &instrument, &self.order_type_cache, ts_init)
1909    }
1910
1911    /// Query a single order by client order ID or venue order ID.
1912    ///
1913    /// # Errors
1914    ///
1915    /// Returns an error if:
1916    /// - Credentials are missing.
1917    /// - The request fails.
1918    /// - The API returns an error.
1919    pub async fn query_order(
1920        &self,
1921        instrument_id: InstrumentId,
1922        client_order_id: Option<ClientOrderId>,
1923        venue_order_id: Option<VenueOrderId>,
1924    ) -> anyhow::Result<Option<OrderStatusReport>> {
1925        let mut params = GetOrderParamsBuilder::default();
1926
1927        let filter_json = if let Some(client_order_id) = client_order_id {
1928            serde_json::json!({
1929                "clOrdID": client_order_id.to_string()
1930            })
1931        } else if let Some(venue_order_id) = venue_order_id {
1932            serde_json::json!({
1933                "orderID": venue_order_id.to_string()
1934            })
1935        } else {
1936            anyhow::bail!("Either client_order_id or venue_order_id must be provided");
1937        };
1938
1939        params.filter(filter_json);
1940        params.count(1); // Only need one order
1941
1942        let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1943
1944        let response = self.inner.get_orders(params).await?;
1945
1946        if response.is_empty() {
1947            return Ok(None);
1948        }
1949
1950        let order = &response[0];
1951
1952        let instrument = self.instrument_from_cache(instrument_id.symbol.inner())?;
1953        let ts_init = self.generate_ts_init();
1954
1955        let report =
1956            parse_order_status_report(order, &instrument, &self.order_type_cache, ts_init)?;
1957
1958        Ok(Some(report))
1959    }
1960
1961    /// Request a single order status report.
1962    ///
1963    /// # Errors
1964    ///
1965    /// Returns an error if:
1966    /// - Credentials are missing.
1967    /// - The request fails.
1968    /// - The API returns an error.
1969    pub async fn request_order_status_report(
1970        &self,
1971        instrument_id: InstrumentId,
1972        client_order_id: Option<ClientOrderId>,
1973        venue_order_id: Option<VenueOrderId>,
1974    ) -> anyhow::Result<OrderStatusReport> {
1975        if venue_order_id.is_none() && client_order_id.is_none() {
1976            anyhow::bail!("Either venue_order_id or client_order_id must be provided");
1977        }
1978
1979        let mut params = GetOrderParamsBuilder::default();
1980        params.symbol(instrument_id.symbol.as_str());
1981
1982        if let Some(venue_order_id) = venue_order_id {
1983            params.filter(serde_json::json!({
1984                "orderID": venue_order_id.as_str()
1985            }));
1986        } else if let Some(client_order_id) = client_order_id {
1987            params.filter(serde_json::json!({
1988                "clOrdID": client_order_id.as_str()
1989            }));
1990        }
1991
1992        params.count(1i32);
1993        let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1994
1995        let response = self.inner.get_orders(params).await?;
1996
1997        let order = response
1998            .into_iter()
1999            .next()
2000            .ok_or_else(|| anyhow::anyhow!("Order not found"))?;
2001
2002        let instrument = self.instrument_from_cache(instrument_id.symbol.inner())?;
2003        let ts_init = self.generate_ts_init();
2004
2005        parse_order_status_report(&order, &instrument, &self.order_type_cache, ts_init)
2006    }
2007
2008    /// Request multiple order status reports.
2009    ///
2010    /// # Errors
2011    ///
2012    /// Returns an error if:
2013    /// - Credentials are missing.
2014    /// - The request fails.
2015    /// - The API returns an error.
2016    pub async fn request_order_status_reports(
2017        &self,
2018        instrument_id: Option<InstrumentId>,
2019        open_only: bool,
2020        start: Option<DateTime<Utc>>,
2021        end: Option<DateTime<Utc>>,
2022        limit: Option<u32>,
2023    ) -> anyhow::Result<Vec<OrderStatusReport>> {
2024        if let (Some(start), Some(end)) = (start, end) {
2025            anyhow::ensure!(
2026                start < end,
2027                "Invalid time range: start={start:?} end={end:?}",
2028            );
2029        }
2030
2031        let mut params = GetOrderParamsBuilder::default();
2032
2033        if let Some(instrument_id) = &instrument_id {
2034            params.symbol(instrument_id.symbol.as_str());
2035        }
2036
2037        if open_only {
2038            params.filter(serde_json::json!({
2039                "open": true
2040            }));
2041        }
2042
2043        if let Some(start) = start {
2044            params.start_time(start);
2045        }
2046
2047        if let Some(end) = end {
2048            params.end_time(end);
2049        }
2050
2051        if let Some(limit) = limit {
2052            params.count(limit as i32);
2053        } else {
2054            params.count(500); // Default count to avoid empty query
2055        }
2056
2057        params.reverse(true); // Get newest orders first
2058
2059        let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
2060
2061        let response = self.inner.get_orders(params).await?;
2062
2063        let ts_init = self.generate_ts_init();
2064
2065        let mut reports = Vec::new();
2066
2067        for order in response {
2068            if let Some(start) = start {
2069                match order.timestamp {
2070                    Some(timestamp) if timestamp < start => continue,
2071                    Some(_) => {}
2072                    None => {
2073                        log::debug!("Skipping order report without timestamp for bounded query");
2074                        continue;
2075                    }
2076                }
2077            }
2078
2079            if let Some(end) = end {
2080                match order.timestamp {
2081                    Some(timestamp) if timestamp > end => continue,
2082                    Some(_) => {}
2083                    None => {
2084                        log::debug!("Skipping order report without timestamp for bounded query");
2085                        continue;
2086                    }
2087                }
2088            }
2089
2090            // Skip orders without symbol (can happen with query responses)
2091            let Some(symbol) = order.symbol else {
2092                log::warn!("Order response missing symbol, skipping");
2093                continue;
2094            };
2095
2096            let Ok(instrument) = self.instrument_from_cache(symbol) else {
2097                log::debug!("Skipping order report for instrument not in cache: symbol={symbol}");
2098                continue;
2099            };
2100
2101            match parse_order_status_report(&order, &instrument, &self.order_type_cache, ts_init) {
2102                Ok(report) => reports.push(report),
2103                Err(e) => log::error!("Failed to parse order status report: {e}"),
2104            }
2105        }
2106
2107        Self::populate_linked_order_ids(&mut reports);
2108
2109        Ok(reports)
2110    }
2111
2112    /// Request trades for the given instrument.
2113    ///
2114    /// # Errors
2115    ///
2116    /// Returns an error if the HTTP request fails or parsing fails.
2117    pub async fn request_trades(
2118        &self,
2119        instrument_id: InstrumentId,
2120        start: Option<DateTime<Utc>>,
2121        end: Option<DateTime<Utc>>,
2122        limit: Option<u32>,
2123    ) -> anyhow::Result<Vec<TradeTick>> {
2124        let mut params = GetTradeParamsBuilder::default();
2125        params.symbol(instrument_id.symbol.as_str());
2126
2127        if let Some(start) = start {
2128            params.start_time(start);
2129        }
2130
2131        if let Some(end) = end {
2132            params.end_time(end);
2133        }
2134
2135        if let (Some(start), Some(end)) = (start, end) {
2136            anyhow::ensure!(
2137                start < end,
2138                "Invalid time range: start={start:?} end={end:?}",
2139            );
2140        }
2141
2142        if let Some(limit) = limit {
2143            let clamped_limit = limit.min(1000);
2144            if limit > 1000 {
2145                log::warn!(
2146                    "BitMEX trade request limit exceeds venue maximum; clamping: limit={limit}, clamped_limit={clamped_limit}",
2147                );
2148            }
2149            params.count(i32::try_from(clamped_limit).unwrap_or(1000));
2150        }
2151        params.reverse(false);
2152        let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
2153
2154        let response = self.inner.get_trades(params).await?;
2155
2156        let ts_init = self.generate_ts_init();
2157
2158        let mut parsed_trades = Vec::new();
2159
2160        for trade in response {
2161            if let Some(start) = start
2162                && trade.timestamp < start
2163            {
2164                continue;
2165            }
2166
2167            if let Some(end) = end
2168                && trade.timestamp > end
2169            {
2170                continue;
2171            }
2172
2173            let Some(instrument) = self.get_instrument(&trade.symbol) else {
2174                log::error!(
2175                    "Instrument {} not found in cache, skipping trade",
2176                    trade.symbol
2177                );
2178                continue;
2179            };
2180
2181            match parse_trade(trade, &instrument, ts_init) {
2182                Ok(trade) => parsed_trades.push(trade),
2183                Err(e) => log::error!("Failed to parse trade: {e}"),
2184            }
2185        }
2186
2187        Ok(parsed_trades)
2188    }
2189
2190    /// Request bars for the given bar type.
2191    ///
2192    /// # Errors
2193    ///
2194    /// Returns an error if the HTTP request fails, parsing fails, or the bar specification is
2195    /// unsupported by BitMEX.
2196    pub async fn request_bars(
2197        &self,
2198        mut bar_type: BarType,
2199        start: Option<DateTime<Utc>>,
2200        end: Option<DateTime<Utc>>,
2201        limit: Option<u32>,
2202        partial: bool,
2203    ) -> anyhow::Result<Vec<Bar>> {
2204        bar_type = bar_type.standard();
2205
2206        anyhow::ensure!(
2207            bar_type.aggregation_source() == AggregationSource::External,
2208            "Only EXTERNAL aggregation bars are supported"
2209        );
2210        anyhow::ensure!(
2211            bar_type.spec().price_type == PriceType::Last,
2212            "Only LAST price type bars are supported"
2213        );
2214
2215        if let (Some(start), Some(end)) = (start, end) {
2216            anyhow::ensure!(
2217                start < end,
2218                "Invalid time range: start={start:?} end={end:?}"
2219            );
2220        }
2221
2222        let spec = bar_type.spec();
2223        let bin_size = match (spec.aggregation, spec.step.get()) {
2224            (BarAggregation::Minute, 1) => "1m",
2225            (BarAggregation::Minute, 5) => "5m",
2226            (BarAggregation::Hour, 1) => "1h",
2227            (BarAggregation::Day, 1) => "1d",
2228            _ => anyhow::bail!(
2229                "BitMEX does not support {}-{:?}-{:?} bars",
2230                spec.step.get(),
2231                spec.aggregation,
2232                spec.price_type,
2233            ),
2234        };
2235
2236        let instrument_id = bar_type.instrument_id();
2237        let instrument = self.instrument_from_cache(instrument_id.symbol.inner())?;
2238
2239        let mut params = GetTradeBucketedParamsBuilder::default();
2240        params.symbol(instrument_id.symbol.as_str());
2241        params.bin_size(bin_size);
2242
2243        if partial {
2244            params.partial(true);
2245        }
2246
2247        if let Some(start) = start {
2248            params.start_time(start);
2249        }
2250
2251        if let Some(end) = end {
2252            params.end_time(end);
2253        }
2254
2255        if let Some(limit) = limit {
2256            let clamped_limit = limit.min(1000);
2257            if limit > 1000 {
2258                log::warn!(
2259                    "BitMEX bar request limit exceeds venue maximum; clamping: limit={limit}, clamped_limit={clamped_limit}",
2260                );
2261            }
2262            params.count(i32::try_from(clamped_limit).unwrap_or(1000));
2263        }
2264        params.reverse(false);
2265        let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
2266
2267        let response = self.inner.get_trade_bucketed(params).await?;
2268        let ts_init = self.generate_ts_init();
2269        let mut bars = Vec::new();
2270
2271        for bin in response {
2272            if let Some(start) = start
2273                && bin.timestamp < start
2274            {
2275                continue;
2276            }
2277
2278            if let Some(end) = end
2279                && bin.timestamp > end
2280            {
2281                continue;
2282            }
2283
2284            if bin.symbol != instrument_id.symbol.inner() {
2285                log::warn!(
2286                    "Skipping trade bin for unexpected symbol: symbol={}, expected={}",
2287                    bin.symbol,
2288                    instrument_id.symbol,
2289                );
2290                continue;
2291            }
2292
2293            match parse_trade_bin(bin, &instrument, &bar_type, ts_init) {
2294                Ok(bar) => bars.push(bar),
2295                Err(e) => log::warn!("Failed to parse trade bin: {e}"),
2296            }
2297        }
2298
2299        Ok(bars)
2300    }
2301
2302    /// Request fill reports for the given instrument.
2303    ///
2304    /// # Errors
2305    ///
2306    /// Returns an error if the HTTP request fails or parsing fails.
2307    pub async fn request_fill_reports(
2308        &self,
2309        instrument_id: Option<InstrumentId>,
2310        start: Option<DateTime<Utc>>,
2311        end: Option<DateTime<Utc>>,
2312        limit: Option<u32>,
2313    ) -> anyhow::Result<Vec<FillReport>> {
2314        if let (Some(start), Some(end)) = (start, end) {
2315            anyhow::ensure!(
2316                start < end,
2317                "Invalid time range: start={start:?} end={end:?}",
2318            );
2319        }
2320
2321        let mut params = GetExecutionParamsBuilder::default();
2322
2323        if let Some(instrument_id) = instrument_id {
2324            params.symbol(instrument_id.symbol.as_str());
2325        }
2326
2327        if let Some(start) = start {
2328            params.start_time(start);
2329        }
2330
2331        if let Some(end) = end {
2332            params.end_time(end);
2333        }
2334
2335        if let Some(limit) = limit {
2336            params.count(limit as i32);
2337        } else {
2338            params.count(500); // Default count
2339        }
2340        params.reverse(true); // Get newest fills first
2341
2342        let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
2343
2344        let response = self.inner.get_executions(params).await?;
2345
2346        let ts_init = self.generate_ts_init();
2347
2348        let mut reports = Vec::new();
2349
2350        for exec in response {
2351            if let Some(start) = start {
2352                match exec.transact_time {
2353                    Some(timestamp) if timestamp < start => continue,
2354                    Some(_) => {}
2355                    None => {
2356                        log::debug!("Skipping fill report without transact_time for bounded query");
2357                        continue;
2358                    }
2359                }
2360            }
2361
2362            if let Some(end) = end {
2363                match exec.transact_time {
2364                    Some(timestamp) if timestamp > end => continue,
2365                    Some(_) => {}
2366                    None => {
2367                        log::debug!("Skipping fill report without transact_time for bounded query");
2368                        continue;
2369                    }
2370                }
2371            }
2372
2373            // Skip executions without symbol (e.g., CancelReject)
2374            let Some(symbol) = exec.symbol else {
2375                log::debug!("Skipping execution without symbol: {:?}", exec.exec_type);
2376                continue;
2377            };
2378            let symbol_str = symbol.to_string();
2379
2380            let instrument = match self.instrument_from_cache(symbol) {
2381                Ok(instrument) => instrument,
2382                Err(e) => {
2383                    log::error!(
2384                        "Instrument not found in cache for execution parsing: symbol={symbol_str}, {e}"
2385                    );
2386                    continue;
2387                }
2388            };
2389
2390            match parse_fill_report(exec, &instrument, ts_init) {
2391                Ok(report) => reports.push(report),
2392                Err(e) => {
2393                    // Log at debug level for expected skip cases
2394                    let error_msg = e.to_string();
2395                    if error_msg.starts_with("Skipping non-trade execution")
2396                        || error_msg.starts_with("Skipping execution without order_id")
2397                    {
2398                        log::debug!("{e}");
2399                    } else {
2400                        log::error!("Failed to parse fill report: {e}");
2401                    }
2402                }
2403            }
2404        }
2405
2406        Ok(reports)
2407    }
2408
2409    /// Request position reports.
2410    ///
2411    /// # Errors
2412    ///
2413    /// Returns an error if the HTTP request fails or parsing fails.
2414    pub async fn request_position_status_reports(
2415        &self,
2416    ) -> anyhow::Result<Vec<PositionStatusReport>> {
2417        let params = GetPositionParamsBuilder::default()
2418            .count(500) // Default count
2419            .build()
2420            .map_err(|e| anyhow::anyhow!(e))?;
2421
2422        let response = self.inner.get_positions(params).await?;
2423
2424        let ts_init = self.generate_ts_init();
2425
2426        let mut reports = Vec::new();
2427
2428        for pos in response {
2429            let symbol = pos.symbol;
2430            let instrument = match self.instrument_from_cache(symbol) {
2431                Ok(instrument) => instrument,
2432                Err(e) => {
2433                    log::error!(
2434                        "Instrument not found in cache for position parsing: symbol={}, {e}",
2435                        pos.symbol.as_str(),
2436                    );
2437                    continue;
2438                }
2439            };
2440
2441            match parse_position_report(pos, &instrument, ts_init) {
2442                Ok(report) => reports.push(report),
2443                Err(e) => log::error!("Failed to parse position report: {e}"),
2444            }
2445        }
2446
2447        Ok(reports)
2448    }
2449
2450    /// Update position leverage.
2451    ///
2452    /// # Errors
2453    ///
2454    /// - Credentials are missing.
2455    /// - The request fails.
2456    /// - The API returns an error.
2457    pub async fn update_position_leverage(
2458        &self,
2459        symbol: &str,
2460        leverage: f64,
2461    ) -> anyhow::Result<PositionStatusReport> {
2462        let params = PostPositionLeverageParams {
2463            symbol: symbol.to_string(),
2464            leverage,
2465            target_account_id: None,
2466        };
2467
2468        let response = self.inner.update_position_leverage(params).await?;
2469
2470        let instrument = self.instrument_from_cache(Ustr::from(symbol))?;
2471        let ts_init = self.generate_ts_init();
2472
2473        parse_position_report(response, &instrument, ts_init)
2474    }
2475}
2476
2477#[cfg(test)]
2478mod tests {
2479    use nautilus_core::UUID4;
2480    use nautilus_model::enums::OrderStatus;
2481    use rstest::rstest;
2482    use serde_json::json;
2483
2484    use super::*;
2485
2486    fn build_report(
2487        client_order_id: &str,
2488        venue_order_id: &str,
2489        contingency_type: ContingencyType,
2490        order_list_id: Option<&str>,
2491    ) -> OrderStatusReport {
2492        let mut report = OrderStatusReport::new(
2493            AccountId::from("BITMEX-1"),
2494            InstrumentId::from("XBTUSD.BITMEX"),
2495            Some(ClientOrderId::from(client_order_id)),
2496            VenueOrderId::from(venue_order_id),
2497            OrderSide::Buy,
2498            OrderType::Limit,
2499            TimeInForce::Gtc,
2500            OrderStatus::Accepted,
2501            Quantity::new(100.0, 0),
2502            Quantity::default(),
2503            UnixNanos::from(1_u64),
2504            UnixNanos::from(1_u64),
2505            UnixNanos::from(1_u64),
2506            Some(UUID4::new()),
2507        );
2508
2509        if let Some(id) = order_list_id {
2510            report = report.with_order_list_id(OrderListId::from(id));
2511        }
2512
2513        report.with_contingency_type(contingency_type)
2514    }
2515
2516    #[rstest]
2517    fn test_sign_request_generates_correct_headers() {
2518        let client = BitmexRawHttpClient::with_credentials(
2519            "test_api_key".to_string(),
2520            "test_api_secret".to_string(),
2521            "http://localhost:8080".to_string(),
2522            Some(60),
2523            None, // max_retries
2524            None, // retry_delay_ms
2525            None, // retry_delay_max_ms
2526            None, // recv_window_ms
2527            None, // max_requests_per_second
2528            None, // max_requests_per_minute
2529            None, // proxy_url
2530        )
2531        .expect("Failed to create test client");
2532
2533        let headers = client
2534            .sign_request(&Method::GET, "/api/v1/order", None)
2535            .unwrap();
2536
2537        assert!(headers.contains_key("api-key"));
2538        assert!(headers.contains_key("api-signature"));
2539        assert!(headers.contains_key("api-expires"));
2540        assert_eq!(headers.get("api-key").unwrap(), "test_api_key");
2541    }
2542
2543    #[rstest]
2544    fn test_sign_request_with_body() {
2545        let client = BitmexRawHttpClient::with_credentials(
2546            "test_api_key".to_string(),
2547            "test_api_secret".to_string(),
2548            "http://localhost:8080".to_string(),
2549            Some(60),
2550            None, // max_retries
2551            None, // retry_delay_ms
2552            None, // retry_delay_max_ms
2553            None, // recv_window_ms
2554            None, // max_requests_per_second
2555            None, // max_requests_per_minute
2556            None, // proxy_url
2557        )
2558        .expect("Failed to create test client");
2559
2560        let body = json!({"symbol": "XBTUSD", "orderQty": 100});
2561        let body_bytes = serde_json::to_vec(&body).unwrap();
2562
2563        let headers_without_body = client
2564            .sign_request(&Method::POST, "/api/v1/order", None)
2565            .unwrap();
2566        let headers_with_body = client
2567            .sign_request(&Method::POST, "/api/v1/order", Some(&body_bytes))
2568            .unwrap();
2569
2570        // Signatures should be different when body is included
2571        assert_ne!(
2572            headers_without_body.get("api-signature").unwrap(),
2573            headers_with_body.get("api-signature").unwrap()
2574        );
2575    }
2576
2577    #[rstest]
2578    fn test_sign_request_uses_custom_recv_window() {
2579        let client_default = BitmexRawHttpClient::with_credentials(
2580            "test_api_key".to_string(),
2581            "test_api_secret".to_string(),
2582            "http://localhost:8080".to_string(),
2583            Some(60),
2584            None,
2585            None,
2586            None,
2587            None, // Use default recv_window_ms (10000ms = 10s)
2588            None, // max_requests_per_second
2589            None, // max_requests_per_minute
2590            None, // proxy_url
2591        )
2592        .expect("Failed to create test client");
2593
2594        let client_custom = BitmexRawHttpClient::with_credentials(
2595            "test_api_key".to_string(),
2596            "test_api_secret".to_string(),
2597            "http://localhost:8080".to_string(),
2598            Some(60),
2599            None,
2600            None,
2601            None,
2602            Some(30_000), // 30 seconds
2603            None,         // max_requests_per_second
2604            None,         // max_requests_per_minute
2605            None,         // proxy_url
2606        )
2607        .expect("Failed to create test client");
2608
2609        let headers_default = client_default
2610            .sign_request(&Method::GET, "/api/v1/order", None)
2611            .unwrap();
2612        let headers_custom = client_custom
2613            .sign_request(&Method::GET, "/api/v1/order", None)
2614            .unwrap();
2615
2616        // Parse expires timestamps
2617        let expires_default: i64 = headers_default.get("api-expires").unwrap().parse().unwrap();
2618        let expires_custom: i64 = headers_custom.get("api-expires").unwrap().parse().unwrap();
2619
2620        // Verify both are valid future timestamps
2621        let now = Utc::now().timestamp();
2622        assert!(expires_default > now);
2623        assert!(expires_custom > now);
2624
2625        // Custom window should be greater than default
2626        assert!(expires_custom > expires_default);
2627
2628        // The difference should be approximately 20 seconds (30s - 10s)
2629        // Allow wider tolerance for delays between calls on slow CI runners
2630        let diff = expires_custom - expires_default;
2631        assert!((18..=25).contains(&diff));
2632    }
2633
2634    #[rstest]
2635    fn test_populate_linked_order_ids_from_order_list() {
2636        let base = "O-20250922-002219-001-000";
2637        let entry = format!("{base}-1");
2638        let stop = format!("{base}-2");
2639        let take = format!("{base}-3");
2640
2641        let mut reports = vec![
2642            build_report(&entry, "V-1", ContingencyType::Oto, Some("OL-1")),
2643            build_report(&stop, "V-2", ContingencyType::Ouo, Some("OL-1")),
2644            build_report(&take, "V-3", ContingencyType::Ouo, Some("OL-1")),
2645        ];
2646
2647        BitmexHttpClient::populate_linked_order_ids(&mut reports);
2648
2649        assert_eq!(
2650            reports[0].linked_order_ids,
2651            Some(vec![
2652                ClientOrderId::from(stop.as_str()),
2653                ClientOrderId::from(take.as_str()),
2654            ]),
2655        );
2656        assert_eq!(
2657            reports[1].linked_order_ids,
2658            Some(vec![
2659                ClientOrderId::from(entry.as_str()),
2660                ClientOrderId::from(take.as_str()),
2661            ]),
2662        );
2663        assert_eq!(
2664            reports[2].linked_order_ids,
2665            Some(vec![
2666                ClientOrderId::from(entry.as_str()),
2667                ClientOrderId::from(stop.as_str()),
2668            ]),
2669        );
2670    }
2671
2672    #[rstest]
2673    fn test_populate_linked_order_ids_from_id_prefix() {
2674        let base = "O-20250922-002220-001-000";
2675        let entry = format!("{base}-1");
2676        let stop = format!("{base}-2");
2677        let take = format!("{base}-3");
2678
2679        let mut reports = vec![
2680            build_report(&entry, "V-1", ContingencyType::Oto, None),
2681            build_report(&stop, "V-2", ContingencyType::Ouo, None),
2682            build_report(&take, "V-3", ContingencyType::Ouo, None),
2683        ];
2684
2685        BitmexHttpClient::populate_linked_order_ids(&mut reports);
2686
2687        assert_eq!(
2688            reports[0].linked_order_ids,
2689            Some(vec![
2690                ClientOrderId::from(stop.as_str()),
2691                ClientOrderId::from(take.as_str()),
2692            ]),
2693        );
2694        assert_eq!(
2695            reports[1].linked_order_ids,
2696            Some(vec![
2697                ClientOrderId::from(entry.as_str()),
2698                ClientOrderId::from(take.as_str()),
2699            ]),
2700        );
2701        assert_eq!(
2702            reports[2].linked_order_ids,
2703            Some(vec![
2704                ClientOrderId::from(entry.as_str()),
2705                ClientOrderId::from(stop.as_str()),
2706            ]),
2707        );
2708    }
2709
2710    #[rstest]
2711    fn test_populate_linked_order_ids_respects_non_contingent_orders() {
2712        let base = "O-20250922-002221-001-000";
2713        let entry = format!("{base}-1");
2714        let passive = format!("{base}-2");
2715
2716        let mut reports = vec![
2717            build_report(&entry, "V-1", ContingencyType::NoContingency, None),
2718            build_report(&passive, "V-2", ContingencyType::Ouo, None),
2719        ];
2720
2721        BitmexHttpClient::populate_linked_order_ids(&mut reports);
2722
2723        // Non-contingent orders should not be linked
2724        assert!(reports[0].linked_order_ids.is_none());
2725
2726        // A contingent order with no other contingent peers should have contingency reset
2727        assert!(reports[1].linked_order_ids.is_none());
2728        assert_eq!(reports[1].contingency_type, ContingencyType::NoContingency);
2729    }
2730}