1use std::{
26 collections::HashMap,
27 num::NonZeroU32,
28 sync::{
29 Arc, LazyLock, RwLock,
30 atomic::{AtomicBool, Ordering},
31 },
32};
33
34use chrono::{DateTime, Utc};
35use dashmap::DashMap;
36use nautilus_core::{
37 AtomicTime, UUID4, UnixNanos,
38 consts::{NAUTILUS_TRADER, NAUTILUS_USER_AGENT},
39 env::get_or_env_var_opt,
40 time::get_atomic_clock_realtime,
41};
42use nautilus_model::{
43 data::{Bar, BarType, TradeTick},
44 enums::{
45 AccountType, AggregationSource, BarAggregation, ContingencyType, OrderSide, OrderType,
46 PriceType, TimeInForce, TrailingOffsetType, TriggerType,
47 },
48 events::AccountState,
49 identifiers::{AccountId, ClientOrderId, InstrumentId, OrderListId, VenueOrderId},
50 instruments::{Instrument as InstrumentTrait, InstrumentAny},
51 reports::{FillReport, OrderStatusReport, PositionStatusReport},
52 types::{Price, Quantity},
53};
54use nautilus_network::{
55 http::{HttpClient, Method, StatusCode, USER_AGENT},
56 ratelimiter::quota::Quota,
57 retry::{RetryConfig, RetryManager},
58};
59use serde::{Deserialize, Serialize, de::DeserializeOwned};
60use serde_json::Value;
61use tokio_util::sync::CancellationToken;
62use ustr::Ustr;
63
64use super::{
65 error::{BitmexErrorResponse, BitmexHttpError},
66 models::{
67 BitmexApiInfo, BitmexExecution, BitmexInstrument, BitmexMargin, BitmexOrder,
68 BitmexPosition, BitmexTrade, BitmexTradeBin, BitmexWallet,
69 },
70 query::{
71 DeleteAllOrdersParams, DeleteOrderParams, GetExecutionParams, GetExecutionParamsBuilder,
72 GetOrderParams, GetPositionParams, GetPositionParamsBuilder, GetTradeBucketedParams,
73 GetTradeBucketedParamsBuilder, GetTradeParams, GetTradeParamsBuilder,
74 PostCancelAllAfterParams, PostOrderParams, PostPositionLeverageParams, PutOrderParams,
75 },
76};
77use crate::{
78 common::{
79 consts::{BITMEX_HTTP_TESTNET_URL, BITMEX_HTTP_URL},
80 credential::{Credential, credential_env_vars},
81 enums::{
82 BitmexContingencyType, BitmexExecInstruction, BitmexOrderStatus, BitmexOrderType,
83 BitmexPegPriceType, BitmexSide, BitmexTimeInForce,
84 },
85 parse::{parse_account_balance, quantity_to_u32},
86 },
87 http::{
88 parse::{
89 InstrumentParseResult, parse_fill_report, parse_instrument_any,
90 parse_order_status_report, parse_position_report, parse_trade, parse_trade_bin,
91 },
92 query::{DeleteAllOrdersParamsBuilder, GetOrderParamsBuilder, PutOrderParamsBuilder},
93 },
94 websocket::messages::BitmexMarginMsg,
95};
96
97const BITMEX_DEFAULT_RATE_LIMIT_PER_SECOND: u32 = 10;
103const BITMEX_DEFAULT_RATE_LIMIT_PER_MINUTE_AUTHENTICATED: u32 = 120;
104const BITMEX_DEFAULT_RATE_LIMIT_PER_MINUTE_UNAUTHENTICATED: u32 = 30;
105
106const BITMEX_GLOBAL_RATE_KEY: &str = "bitmex:global";
107const BITMEX_MINUTE_RATE_KEY: &str = "bitmex:minute";
108
109static RATE_LIMIT_KEYS: LazyLock<Vec<Ustr>> = LazyLock::new(|| {
110 vec![
111 Ustr::from(BITMEX_GLOBAL_RATE_KEY),
112 Ustr::from(BITMEX_MINUTE_RATE_KEY),
113 ]
114});
115
116#[derive(Debug, Serialize, Deserialize)]
118pub struct BitmexResponse<T> {
119 pub data: Vec<T>,
121}
122
123#[derive(Debug, Clone)]
143pub struct BitmexRawHttpClient {
144 base_url: String,
145 client: HttpClient,
146 credential: Option<Credential>,
147 recv_window_ms: u64,
148 retry_manager: RetryManager<BitmexHttpError>,
149 cancellation_token: Arc<RwLock<CancellationToken>>,
150}
151
152impl Default for BitmexRawHttpClient {
153 fn default() -> Self {
154 Self::new(None, Some(60), None, None, None, None, None, None, None)
155 .expect("Failed to create default BitmexHttpInnerClient")
156 }
157}
158
159impl BitmexRawHttpClient {
160 #[allow(clippy::too_many_arguments)]
170 pub fn new(
171 base_url: Option<String>,
172 timeout_secs: Option<u64>,
173 max_retries: Option<u32>,
174 retry_delay_ms: Option<u64>,
175 retry_delay_max_ms: Option<u64>,
176 recv_window_ms: Option<u64>,
177 max_requests_per_second: Option<u32>,
178 max_requests_per_minute: Option<u32>,
179 proxy_url: Option<String>,
180 ) -> Result<Self, BitmexHttpError> {
181 let retry_config = RetryConfig {
182 max_retries: max_retries.unwrap_or(3),
183 initial_delay_ms: retry_delay_ms.unwrap_or(1000),
184 max_delay_ms: retry_delay_max_ms.unwrap_or(10_000),
185 backoff_factor: 2.0,
186 jitter_ms: 1000,
187 operation_timeout_ms: Some(60_000),
188 immediate_first: false,
189 max_elapsed_ms: Some(180_000),
190 };
191
192 let retry_manager = RetryManager::new(retry_config);
193
194 let max_req_per_sec =
195 max_requests_per_second.unwrap_or(BITMEX_DEFAULT_RATE_LIMIT_PER_SECOND);
196 let max_req_per_min =
197 max_requests_per_minute.unwrap_or(BITMEX_DEFAULT_RATE_LIMIT_PER_MINUTE_UNAUTHENTICATED);
198
199 Ok(Self {
200 base_url: base_url.unwrap_or(BITMEX_HTTP_URL.to_string()),
201 client: HttpClient::new(
202 Self::default_headers(),
203 vec![],
204 Self::rate_limiter_quotas(max_req_per_sec, max_req_per_min)?,
205 Some(Self::default_quota(max_req_per_sec)?),
206 timeout_secs,
207 proxy_url,
208 )
209 .map_err(|e| {
210 BitmexHttpError::NetworkError(format!("Failed to create HTTP client: {e}"))
211 })?,
212 credential: None,
213 recv_window_ms: recv_window_ms.unwrap_or(10_000),
214 retry_manager,
215 cancellation_token: Arc::new(RwLock::new(CancellationToken::new())),
216 })
217 }
218
219 #[allow(clippy::too_many_arguments)]
226 pub fn with_credentials(
227 api_key: String,
228 api_secret: String,
229 base_url: String,
230 timeout_secs: Option<u64>,
231 max_retries: Option<u32>,
232 retry_delay_ms: Option<u64>,
233 retry_delay_max_ms: Option<u64>,
234 recv_window_ms: Option<u64>,
235 max_requests_per_second: Option<u32>,
236 max_requests_per_minute: Option<u32>,
237 proxy_url: Option<String>,
238 ) -> Result<Self, BitmexHttpError> {
239 let retry_config = RetryConfig {
240 max_retries: max_retries.unwrap_or(3),
241 initial_delay_ms: retry_delay_ms.unwrap_or(1000),
242 max_delay_ms: retry_delay_max_ms.unwrap_or(10_000),
243 backoff_factor: 2.0,
244 jitter_ms: 1000,
245 operation_timeout_ms: Some(60_000),
246 immediate_first: false,
247 max_elapsed_ms: Some(180_000),
248 };
249
250 let retry_manager = RetryManager::new(retry_config);
251
252 let max_req_per_sec =
253 max_requests_per_second.unwrap_or(BITMEX_DEFAULT_RATE_LIMIT_PER_SECOND);
254 let max_req_per_min =
255 max_requests_per_minute.unwrap_or(BITMEX_DEFAULT_RATE_LIMIT_PER_MINUTE_AUTHENTICATED);
256
257 Ok(Self {
258 base_url,
259 client: HttpClient::new(
260 Self::default_headers(),
261 vec![],
262 Self::rate_limiter_quotas(max_req_per_sec, max_req_per_min)?,
263 Some(Self::default_quota(max_req_per_sec)?),
264 timeout_secs,
265 proxy_url,
266 )
267 .map_err(|e| {
268 BitmexHttpError::NetworkError(format!("Failed to create HTTP client: {e}"))
269 })?,
270 credential: Some(Credential::new(api_key, api_secret)),
271 recv_window_ms: recv_window_ms.unwrap_or(10_000),
272 retry_manager,
273 cancellation_token: Arc::new(RwLock::new(CancellationToken::new())),
274 })
275 }
276
277 fn default_headers() -> HashMap<String, String> {
278 HashMap::from([(USER_AGENT.to_string(), NAUTILUS_USER_AGENT.to_string())])
279 }
280
281 fn default_quota(max_requests_per_second: u32) -> Result<Quota, BitmexHttpError> {
282 let burst = NonZeroU32::new(max_requests_per_second)
283 .unwrap_or(NonZeroU32::new(BITMEX_DEFAULT_RATE_LIMIT_PER_SECOND).expect("non-zero"));
284 Quota::per_second(burst).ok_or_else(|| {
285 BitmexHttpError::ValidationError(format!(
286 "Invalid max_requests_per_second: {max_requests_per_second} exceeds maximum"
287 ))
288 })
289 }
290
291 fn rate_limiter_quotas(
292 max_requests_per_second: u32,
293 max_requests_per_minute: u32,
294 ) -> Result<Vec<(String, Quota)>, BitmexHttpError> {
295 let per_sec_quota = Self::default_quota(max_requests_per_second)?;
296 let per_min_quota =
297 Quota::per_minute(NonZeroU32::new(max_requests_per_minute).unwrap_or_else(|| {
298 NonZeroU32::new(BITMEX_DEFAULT_RATE_LIMIT_PER_MINUTE_AUTHENTICATED)
299 .expect("non-zero")
300 }));
301
302 Ok(vec![
303 (BITMEX_GLOBAL_RATE_KEY.to_string(), per_sec_quota),
304 (BITMEX_MINUTE_RATE_KEY.to_string(), per_min_quota),
305 ])
306 }
307
308 fn rate_limit_keys() -> Vec<Ustr> {
309 RATE_LIMIT_KEYS.clone()
310 }
311
312 pub fn cancel_all_requests(&self) {
318 self.cancellation_token
319 .read()
320 .expect("cancellation token lock poisoned")
321 .cancel();
322 }
323
324 pub fn reset_cancellation_token(&self) {
330 *self
331 .cancellation_token
332 .write()
333 .expect("cancellation token lock poisoned") = CancellationToken::new();
334 }
335
336 pub fn cancellation_token(&self) -> CancellationToken {
342 self.cancellation_token
343 .read()
344 .expect("cancellation token lock poisoned")
345 .clone()
346 }
347
348 fn sign_request(
349 &self,
350 method: &Method,
351 endpoint: &str,
352 body: Option<&[u8]>,
353 ) -> Result<HashMap<String, String>, BitmexHttpError> {
354 let credential = self
355 .credential
356 .as_ref()
357 .ok_or(BitmexHttpError::MissingCredentials)?;
358
359 let expires = Utc::now().timestamp() + (self.recv_window_ms / 1000) as i64;
360 let body_str = body.and_then(|b| std::str::from_utf8(b).ok()).unwrap_or("");
361
362 let full_path = if endpoint.starts_with("/api/v1") {
363 endpoint.to_string()
364 } else {
365 format!("/api/v1{endpoint}")
366 };
367
368 let signature = credential.sign(method.as_str(), &full_path, expires, body_str);
369
370 let mut headers = HashMap::new();
371 headers.insert("api-expires".to_string(), expires.to_string());
372 headers.insert("api-key".to_string(), credential.api_key().to_string());
373 headers.insert("api-signature".to_string(), signature);
374
375 if body.is_some()
377 && (*method == Method::POST || *method == Method::PUT || *method == Method::DELETE)
378 {
379 headers.insert(
380 "Content-Type".to_string(),
381 "application/x-www-form-urlencoded".to_string(),
382 );
383 }
384
385 Ok(headers)
386 }
387
388 async fn send_request<T: DeserializeOwned, P: Serialize>(
389 &self,
390 method: Method,
391 endpoint: &str,
392 params: Option<&P>,
393 body: Option<Vec<u8>>,
394 authenticate: bool,
395 ) -> Result<T, BitmexHttpError> {
396 let endpoint = endpoint.to_string();
397 let method_clone = method.clone();
398 let body_clone = body.clone();
399
400 let params_str = if method == Method::GET || method == Method::DELETE {
403 params
404 .map(serde_urlencoded::to_string)
405 .transpose()
406 .map_err(|e| {
407 BitmexHttpError::JsonError(format!("Failed to serialize params: {e}"))
408 })?
409 } else {
410 None
411 };
412
413 let full_endpoint = match params_str {
414 Some(ref query) if !query.is_empty() => format!("{endpoint}?{query}"),
415 _ => endpoint.clone(),
416 };
417
418 let url = format!("{}{}", self.base_url, full_endpoint);
419
420 let operation = || {
421 let url = url.clone();
422 let method = method_clone.clone();
423 let body = body_clone.clone();
424 let full_endpoint = full_endpoint.clone();
425
426 async move {
427 let headers = if authenticate {
428 Some(self.sign_request(&method, &full_endpoint, body.as_deref())?)
429 } else {
430 None
431 };
432
433 let rate_keys = Self::rate_limit_keys();
434 let resp = self
435 .client
436 .request_with_ustr_keys(method, url, None, headers, body, None, Some(rate_keys))
437 .await?;
438
439 if resp.status.is_success() {
440 serde_json::from_slice(&resp.body).map_err(Into::into)
441 } else if let Ok(error_resp) =
442 serde_json::from_slice::<BitmexErrorResponse>(&resp.body)
443 {
444 Err(error_resp.into())
445 } else {
446 Err(BitmexHttpError::UnexpectedStatus {
447 status: StatusCode::from_u16(resp.status.as_u16())
448 .unwrap_or(StatusCode::INTERNAL_SERVER_ERROR),
449 body: String::from_utf8_lossy(&resp.body).to_string(),
450 })
451 }
452 }
453 };
454
455 let should_retry = |error: &BitmexHttpError| -> bool {
472 match error {
473 BitmexHttpError::NetworkError(_) => true,
474 BitmexHttpError::UnexpectedStatus { status, .. } => {
475 status.as_u16() >= 500 || status.as_u16() == 429
476 }
477 BitmexHttpError::BitmexError {
478 error_name,
479 message,
480 } => {
481 error_name == "RateLimitError"
482 || (error_name == "HTTPError"
483 && message.to_lowercase().contains("rate limit"))
484 }
485 _ => false,
486 }
487 };
488
489 let create_error = |msg: String| -> BitmexHttpError {
490 if msg == "canceled" {
491 BitmexHttpError::Canceled("Adapter disconnecting or shutting down".to_string())
492 } else {
493 BitmexHttpError::NetworkError(msg)
494 }
495 };
496
497 let cancel_token = self.cancellation_token();
498
499 self.retry_manager
500 .execute_with_retry_with_cancel(
501 endpoint.as_str(),
502 operation,
503 should_retry,
504 create_error,
505 &cancel_token,
506 )
507 .await
508 }
509
510 pub async fn get_instruments(
516 &self,
517 active_only: bool,
518 ) -> Result<Vec<BitmexInstrument>, BitmexHttpError> {
519 let path = if active_only {
520 "/instrument/active"
521 } else {
522 "/instrument"
523 };
524 self.send_request::<_, ()>(Method::GET, path, None, None, false)
525 .await
526 }
527
528 pub async fn get_server_time(&self) -> Result<u64, BitmexHttpError> {
538 let response: BitmexApiInfo = self
539 .send_request::<_, ()>(Method::GET, "", None, None, false)
540 .await?;
541 Ok(response.timestamp)
542 }
543
544 pub async fn get_instrument(
555 &self,
556 symbol: &str,
557 ) -> Result<Option<BitmexInstrument>, BitmexHttpError> {
558 let path = &format!("/instrument?symbol={symbol}");
559 let instruments: Vec<BitmexInstrument> = self
560 .send_request::<_, ()>(Method::GET, path, None, None, false)
561 .await?;
562
563 Ok(instruments.into_iter().next())
564 }
565
566 pub async fn get_wallet(&self) -> Result<BitmexWallet, BitmexHttpError> {
572 let endpoint = "/user/wallet";
573 self.send_request::<_, ()>(Method::GET, endpoint, None, None, true)
574 .await
575 }
576
577 pub async fn get_margin(&self, currency: &str) -> Result<BitmexMargin, BitmexHttpError> {
583 let path = format!("/user/margin?currency={currency}");
584 self.send_request::<_, ()>(Method::GET, &path, None, None, true)
585 .await
586 }
587
588 pub async fn get_all_margins(&self) -> Result<Vec<BitmexMargin>, BitmexHttpError> {
594 self.send_request::<_, ()>(Method::GET, "/user/margin?currency=all", None, None, true)
595 .await
596 }
597
598 pub async fn get_trades(
604 &self,
605 params: GetTradeParams,
606 ) -> Result<Vec<BitmexTrade>, BitmexHttpError> {
607 self.send_request(Method::GET, "/trade", Some(¶ms), None, true)
608 .await
609 }
610
611 pub async fn get_trade_bucketed(
617 &self,
618 params: GetTradeBucketedParams,
619 ) -> Result<Vec<BitmexTradeBin>, BitmexHttpError> {
620 self.send_request(Method::GET, "/trade/bucketed", Some(¶ms), None, true)
621 .await
622 }
623
624 pub async fn get_orders(
630 &self,
631 params: GetOrderParams,
632 ) -> Result<Vec<BitmexOrder>, BitmexHttpError> {
633 self.send_request(Method::GET, "/order", Some(¶ms), None, true)
634 .await
635 }
636
637 pub async fn place_order(&self, params: PostOrderParams) -> Result<Value, BitmexHttpError> {
643 let body = serde_urlencoded::to_string(¶ms)
645 .map_err(|e| {
646 BitmexHttpError::ValidationError(format!("Failed to serialize parameters: {e}"))
647 })?
648 .into_bytes();
649 let path = "/order";
650 self.send_request::<_, ()>(Method::POST, path, None, Some(body), true)
651 .await
652 }
653
654 pub async fn cancel_orders(&self, params: DeleteOrderParams) -> Result<Value, BitmexHttpError> {
660 let body = serde_urlencoded::to_string(¶ms)
662 .map_err(|e| {
663 BitmexHttpError::ValidationError(format!("Failed to serialize parameters: {e}"))
664 })?
665 .into_bytes();
666 let path = "/order";
667 self.send_request::<_, ()>(Method::DELETE, path, None, Some(body), true)
668 .await
669 }
670
671 pub async fn amend_order(&self, params: PutOrderParams) -> Result<Value, BitmexHttpError> {
677 let body = serde_urlencoded::to_string(¶ms)
679 .map_err(|e| {
680 BitmexHttpError::ValidationError(format!("Failed to serialize parameters: {e}"))
681 })?
682 .into_bytes();
683 let path = "/order";
684 self.send_request::<_, ()>(Method::PUT, path, None, Some(body), true)
685 .await
686 }
687
688 pub async fn cancel_all_orders(
698 &self,
699 params: DeleteAllOrdersParams,
700 ) -> Result<Value, BitmexHttpError> {
701 self.send_request(Method::DELETE, "/order/all", Some(¶ms), None, true)
702 .await
703 }
704
705 pub async fn cancel_all_after(
717 &self,
718 params: PostCancelAllAfterParams,
719 ) -> Result<Value, BitmexHttpError> {
720 let body = serde_urlencoded::to_string(¶ms)
721 .map_err(|e| {
722 BitmexHttpError::ValidationError(format!("Failed to serialize parameters: {e}"))
723 })?
724 .into_bytes();
725 self.send_request::<_, ()>(
726 Method::POST,
727 "/order/cancelAllAfter",
728 None,
729 Some(body),
730 true,
731 )
732 .await
733 }
734
735 pub async fn get_executions(
741 &self,
742 params: GetExecutionParams,
743 ) -> Result<Vec<BitmexExecution>, BitmexHttpError> {
744 let query = serde_urlencoded::to_string(¶ms).map_err(|e| {
745 BitmexHttpError::ValidationError(format!("Failed to serialize parameters: {e}"))
746 })?;
747 let path = format!("/execution/tradeHistory?{query}");
748 self.send_request::<_, ()>(Method::GET, &path, None, None, true)
749 .await
750 }
751
752 pub async fn get_positions(
758 &self,
759 params: GetPositionParams,
760 ) -> Result<Vec<BitmexPosition>, BitmexHttpError> {
761 self.send_request(Method::GET, "/position", Some(¶ms), None, true)
762 .await
763 }
764
765 pub async fn update_position_leverage(
771 &self,
772 params: PostPositionLeverageParams,
773 ) -> Result<BitmexPosition, BitmexHttpError> {
774 let body = serde_urlencoded::to_string(¶ms)
776 .map_err(|e| {
777 BitmexHttpError::ValidationError(format!("Failed to serialize parameters: {e}"))
778 })?
779 .into_bytes();
780 let path = "/position/leverage";
781 self.send_request::<_, ()>(Method::POST, path, None, Some(body), true)
782 .await
783 }
784}
785
786#[derive(Debug)]
791#[cfg_attr(
792 feature = "python",
793 pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.bitmex", from_py_object)
794)]
795pub struct BitmexHttpClient {
796 pub(crate) instruments_cache: Arc<DashMap<Ustr, InstrumentAny>>,
797 pub(crate) order_type_cache: Arc<DashMap<ClientOrderId, OrderType>>,
798 clock: &'static AtomicTime,
799 inner: Arc<BitmexRawHttpClient>,
800 cache_initialized: AtomicBool,
801}
802
803impl Clone for BitmexHttpClient {
804 fn clone(&self) -> Self {
805 let cache_initialized = AtomicBool::new(false);
806
807 let is_initialized = self.cache_initialized.load(Ordering::Acquire);
808 if is_initialized {
809 cache_initialized.store(true, Ordering::Release);
810 }
811
812 Self {
813 inner: self.inner.clone(),
814 instruments_cache: self.instruments_cache.clone(),
815 order_type_cache: self.order_type_cache.clone(),
816 cache_initialized,
817 clock: self.clock,
818 }
819 }
820}
821
822impl Default for BitmexHttpClient {
823 fn default() -> Self {
824 Self::new(
825 None,
826 None,
827 None,
828 false,
829 Some(60),
830 None,
831 None,
832 None,
833 None,
834 None,
835 None,
836 None, )
838 .expect("Failed to create default BitmexHttpClient")
839 }
840}
841
842impl BitmexHttpClient {
843 #[allow(clippy::too_many_arguments)]
849 pub fn new(
850 base_url: Option<String>,
851 api_key: Option<String>,
852 api_secret: Option<String>,
853 testnet: bool,
854 timeout_secs: Option<u64>,
855 max_retries: Option<u32>,
856 retry_delay_ms: Option<u64>,
857 retry_delay_max_ms: Option<u64>,
858 recv_window_ms: Option<u64>,
859 max_requests_per_second: Option<u32>,
860 max_requests_per_minute: Option<u32>,
861 proxy_url: Option<String>,
862 ) -> Result<Self, BitmexHttpError> {
863 let url = base_url.unwrap_or_else(|| {
865 if testnet {
866 BITMEX_HTTP_TESTNET_URL.to_string()
867 } else {
868 BITMEX_HTTP_URL.to_string()
869 }
870 });
871
872 let (key_var, secret_var) = credential_env_vars(testnet);
873 let api_key = get_or_env_var_opt(api_key, key_var);
874 let api_secret = get_or_env_var_opt(api_secret, secret_var);
875
876 let inner = match (api_key, api_secret) {
877 (Some(key), Some(secret)) => BitmexRawHttpClient::with_credentials(
878 key,
879 secret,
880 url,
881 timeout_secs,
882 max_retries,
883 retry_delay_ms,
884 retry_delay_max_ms,
885 recv_window_ms,
886 max_requests_per_second,
887 max_requests_per_minute,
888 proxy_url,
889 )?,
890 (Some(_), None) | (None, Some(_)) => {
891 return Err(BitmexHttpError::ValidationError(
892 "Both api_key and api_secret must be provided, or neither".to_string(),
893 ));
894 }
895 (None, None) => BitmexRawHttpClient::new(
896 Some(url),
897 timeout_secs,
898 max_retries,
899 retry_delay_ms,
900 retry_delay_max_ms,
901 recv_window_ms,
902 max_requests_per_second,
903 max_requests_per_minute,
904 proxy_url,
905 )?,
906 };
907
908 Ok(Self {
909 inner: Arc::new(inner),
910 instruments_cache: Arc::new(DashMap::new()),
911 order_type_cache: Arc::new(DashMap::new()),
912 cache_initialized: AtomicBool::new(false),
913 clock: get_atomic_clock_realtime(),
914 })
915 }
916
917 pub fn from_env() -> anyhow::Result<Self> {
924 Self::with_credentials(
925 None, None, None, None, None, None, None, None, None, None, None,
926 )
927 .map_err(|e| anyhow::anyhow!("Failed to create HTTP client: {e}"))
928 }
929
930 #[allow(clippy::too_many_arguments)]
940 pub fn with_credentials(
941 api_key: Option<String>,
942 api_secret: Option<String>,
943 base_url: Option<String>,
944 timeout_secs: Option<u64>,
945 max_retries: Option<u32>,
946 retry_delay_ms: Option<u64>,
947 retry_delay_max_ms: Option<u64>,
948 recv_window_ms: Option<u64>,
949 max_requests_per_second: Option<u32>,
950 max_requests_per_minute: Option<u32>,
951 proxy_url: Option<String>,
952 ) -> anyhow::Result<Self> {
953 let testnet = base_url.as_ref().is_some_and(|url| url.contains("testnet"));
955
956 let (key_var, secret_var) = credential_env_vars(testnet);
957
958 let api_key = get_or_env_var_opt(api_key, key_var);
959 let api_secret = get_or_env_var_opt(api_secret, secret_var);
960
961 if api_key.is_some() && api_secret.is_none() {
963 anyhow::bail!("{secret_var} is required when {key_var} is provided");
964 }
965
966 if api_key.is_none() && api_secret.is_some() {
967 anyhow::bail!("{key_var} is required when {secret_var} is provided");
968 }
969
970 Self::new(
971 base_url,
972 api_key,
973 api_secret,
974 testnet,
975 timeout_secs,
976 max_retries,
977 retry_delay_ms,
978 retry_delay_max_ms,
979 recv_window_ms,
980 max_requests_per_second,
981 max_requests_per_minute,
982 proxy_url,
983 )
984 .map_err(|e| anyhow::anyhow!("Failed to create HTTP client: {e}"))
985 }
986
987 #[must_use]
989 pub fn base_url(&self) -> &str {
990 self.inner.base_url.as_str()
991 }
992
993 #[must_use]
995 pub fn api_key(&self) -> Option<&str> {
996 self.inner.credential.as_ref().map(|c| c.api_key())
997 }
998
999 #[must_use]
1001 pub fn api_key_masked(&self) -> Option<String> {
1002 self.inner.credential.as_ref().map(|c| c.api_key_masked())
1003 }
1004
1005 pub async fn get_server_time(&self) -> Result<u64, BitmexHttpError> {
1013 self.inner.get_server_time().await
1014 }
1015
1016 pub async fn cancel_all_after(&self, timeout_ms: u64) -> anyhow::Result<()> {
1024 let params = PostCancelAllAfterParams {
1025 timeout: timeout_ms,
1026 };
1027 self.inner.cancel_all_after(params).await?;
1028 Ok(())
1029 }
1030
1031 fn generate_ts_init(&self) -> UnixNanos {
1033 self.clock.get_time_ns()
1034 }
1035
1036 fn is_contingent_order(contingency_type: ContingencyType) -> bool {
1038 matches!(
1039 contingency_type,
1040 ContingencyType::Oco | ContingencyType::Oto | ContingencyType::Ouo
1041 )
1042 }
1043
1044 fn is_parent_contingency(contingency_type: ContingencyType) -> bool {
1046 matches!(
1047 contingency_type,
1048 ContingencyType::Oco | ContingencyType::Oto
1049 )
1050 }
1051
1052 fn populate_linked_order_ids(reports: &mut [OrderStatusReport]) {
1054 let mut order_list_groups: HashMap<OrderListId, Vec<ClientOrderId>> = HashMap::new();
1055 let mut order_list_parents: HashMap<OrderListId, ClientOrderId> = HashMap::new();
1056 let mut prefix_groups: HashMap<String, Vec<ClientOrderId>> = HashMap::new();
1057 let mut prefix_parents: HashMap<String, ClientOrderId> = HashMap::new();
1058
1059 for report in reports.iter() {
1060 let Some(client_order_id) = report.client_order_id else {
1061 continue;
1062 };
1063
1064 if let Some(order_list_id) = report.order_list_id {
1065 order_list_groups
1066 .entry(order_list_id)
1067 .or_default()
1068 .push(client_order_id);
1069
1070 if Self::is_parent_contingency(report.contingency_type) {
1071 order_list_parents
1072 .entry(order_list_id)
1073 .or_insert(client_order_id);
1074 }
1075 }
1076
1077 if let Some((base, _)) = client_order_id.as_str().rsplit_once('-')
1078 && Self::is_contingent_order(report.contingency_type)
1079 {
1080 prefix_groups
1081 .entry(base.to_owned())
1082 .or_default()
1083 .push(client_order_id);
1084
1085 if Self::is_parent_contingency(report.contingency_type) {
1086 prefix_parents
1087 .entry(base.to_owned())
1088 .or_insert(client_order_id);
1089 }
1090 }
1091 }
1092
1093 for report in reports.iter_mut() {
1094 let Some(client_order_id) = report.client_order_id else {
1095 continue;
1096 };
1097
1098 if report.linked_order_ids.is_some() {
1099 continue;
1100 }
1101
1102 if !Self::is_contingent_order(report.contingency_type) {
1104 continue;
1105 }
1106
1107 if let Some(order_list_id) = report.order_list_id
1108 && let Some(group) = order_list_groups.get(&order_list_id)
1109 {
1110 let mut linked: Vec<ClientOrderId> = group
1111 .iter()
1112 .copied()
1113 .filter(|candidate| candidate != &client_order_id)
1114 .collect();
1115
1116 if !linked.is_empty() {
1117 if let Some(parent_id) = order_list_parents.get(&order_list_id) {
1118 if client_order_id == *parent_id {
1119 report.parent_order_id = None;
1120 } else {
1121 linked.sort_by_key(|candidate| i32::from(candidate != parent_id));
1122 report.parent_order_id = Some(*parent_id);
1123 }
1124 } else {
1125 report.parent_order_id = None;
1126 }
1127
1128 log::trace!(
1129 "BitMEX linked ids sourced from order list id: client_order_id={:?}, order_list_id={:?}, contingency_type={:?}, linked_order_ids={:?}",
1130 client_order_id,
1131 order_list_id,
1132 report.contingency_type,
1133 linked,
1134 );
1135 report.linked_order_ids = Some(linked);
1136 continue;
1137 }
1138
1139 log::trace!(
1140 "BitMEX order list id group had no peers: client_order_id={:?}, order_list_id={:?}, contingency_type={:?}, order_list_group={:?}",
1141 client_order_id,
1142 order_list_id,
1143 report.contingency_type,
1144 group,
1145 );
1146 report.parent_order_id = None;
1147 } else if report.order_list_id.is_none() {
1148 report.parent_order_id = None;
1149 }
1150
1151 if let Some((base, _)) = client_order_id.as_str().rsplit_once('-')
1152 && let Some(group) = prefix_groups.get(base)
1153 {
1154 let mut linked: Vec<ClientOrderId> = group
1155 .iter()
1156 .copied()
1157 .filter(|candidate| candidate != &client_order_id)
1158 .collect();
1159
1160 if !linked.is_empty() {
1161 if let Some(parent_id) = prefix_parents.get(base) {
1162 if client_order_id == *parent_id {
1163 report.parent_order_id = None;
1164 } else {
1165 linked.sort_by_key(|candidate| i32::from(candidate != parent_id));
1166 report.parent_order_id = Some(*parent_id);
1167 }
1168 } else {
1169 report.parent_order_id = None;
1170 }
1171
1172 log::trace!(
1173 "BitMEX linked ids constructed from client order id prefix: client_order_id={:?}, contingency_type={:?}, base={}, linked_order_ids={:?}",
1174 client_order_id,
1175 report.contingency_type,
1176 base,
1177 linked,
1178 );
1179 report.linked_order_ids = Some(linked);
1180 continue;
1181 }
1182
1183 log::trace!(
1184 "BitMEX client order id prefix group had no peers: client_order_id={:?}, contingency_type={:?}, base={}, prefix_group={:?}",
1185 client_order_id,
1186 report.contingency_type,
1187 base,
1188 group,
1189 );
1190 report.parent_order_id = None;
1191 } else if client_order_id.as_str().contains('-') {
1192 report.parent_order_id = None;
1193 }
1194
1195 if Self::is_contingent_order(report.contingency_type) {
1196 log::warn!(
1197 "BitMEX order status report missing linked ids after grouping: client_order_id={:?}, order_list_id={:?}, contingency_type={:?}",
1198 report.client_order_id,
1199 report.order_list_id,
1200 report.contingency_type,
1201 );
1202 report.contingency_type = ContingencyType::NoContingency;
1203 report.parent_order_id = None;
1204 }
1205
1206 report.linked_order_ids = None;
1207 }
1208 }
1209
1210 pub fn cancel_all_requests(&self) {
1212 self.inner.cancel_all_requests();
1213 }
1214
1215 pub fn reset_cancellation_token(&self) {
1217 self.inner.reset_cancellation_token();
1218 }
1219
1220 pub fn cancellation_token(&self) -> CancellationToken {
1222 self.inner.cancellation_token()
1223 }
1224
1225 pub fn cache_instrument(&self, instrument: InstrumentAny) {
1229 self.instruments_cache
1230 .insert(instrument.raw_symbol().inner(), instrument);
1231 self.cache_initialized.store(true, Ordering::Release);
1232 }
1233
1234 pub fn get_instrument(&self, symbol: &Ustr) -> Option<InstrumentAny> {
1236 self.instruments_cache
1237 .get(symbol)
1238 .map(|entry| entry.value().clone())
1239 }
1240
1241 pub async fn request_instrument(
1249 &self,
1250 instrument_id: InstrumentId,
1251 ) -> anyhow::Result<Option<InstrumentAny>> {
1252 let response = self
1253 .inner
1254 .get_instrument(instrument_id.symbol.as_str())
1255 .await?;
1256
1257 let instrument = match response {
1258 Some(instrument) => instrument,
1259 None => return Ok(None),
1260 };
1261
1262 let ts_init = self.generate_ts_init();
1263
1264 match parse_instrument_any(&instrument, ts_init) {
1265 InstrumentParseResult::Ok(inst) => Ok(Some(*inst)),
1266 InstrumentParseResult::Unsupported {
1267 symbol,
1268 instrument_type,
1269 } => {
1270 log::debug!(
1271 "Instrument {symbol} has unsupported type {instrument_type:?}, returning None"
1272 );
1273 Ok(None)
1274 }
1275 InstrumentParseResult::Inactive { symbol, state } => {
1276 log::debug!("Instrument {symbol} is inactive (state={state}), returning None");
1277 Ok(None)
1278 }
1279 InstrumentParseResult::Failed {
1280 symbol,
1281 instrument_type,
1282 error,
1283 } => {
1284 log::error!(
1285 "Failed to parse instrument {symbol} (type={instrument_type:?}): {error}"
1286 );
1287 Ok(None)
1288 }
1289 }
1290 }
1291
1292 pub async fn request_instruments(
1298 &self,
1299 active_only: bool,
1300 ) -> anyhow::Result<Vec<InstrumentAny>> {
1301 let instruments = self.inner.get_instruments(active_only).await?;
1302 let ts_init = self.generate_ts_init();
1303
1304 let mut parsed_instruments = Vec::new();
1305 let mut skipped_count = 0;
1306 let mut inactive_count = 0;
1307 let mut failed_count = 0;
1308 let total_count = instruments.len();
1309
1310 for inst in instruments {
1311 match parse_instrument_any(&inst, ts_init) {
1312 InstrumentParseResult::Ok(instrument_any) => {
1313 parsed_instruments.push(*instrument_any);
1314 }
1315 InstrumentParseResult::Unsupported {
1316 symbol,
1317 instrument_type,
1318 } => {
1319 skipped_count += 1;
1320 log::debug!(
1321 "Skipping unsupported instrument type: symbol={symbol}, type={instrument_type:?}"
1322 );
1323 }
1324 InstrumentParseResult::Inactive { symbol, state } => {
1325 inactive_count += 1;
1326 log::debug!("Skipping inactive instrument: symbol={symbol}, state={state}");
1327 }
1328 InstrumentParseResult::Failed {
1329 symbol,
1330 instrument_type,
1331 error,
1332 } => {
1333 failed_count += 1;
1334 log::error!(
1335 "Failed to parse instrument: symbol={symbol}, type={instrument_type:?}, error={error}"
1336 );
1337 }
1338 }
1339 }
1340
1341 if skipped_count > 0 {
1342 log::info!(
1343 "Skipped {skipped_count} unsupported instrument type(s) out of {total_count} total"
1344 );
1345 }
1346
1347 if inactive_count > 0 {
1348 log::info!(
1349 "Skipped {inactive_count} inactive instrument(s) out of {total_count} total"
1350 );
1351 }
1352
1353 if failed_count > 0 {
1354 log::error!(
1355 "Instrument parse failures: {failed_count} failed out of {total_count} total ({} successfully parsed)",
1356 parsed_instruments.len()
1357 );
1358 }
1359
1360 Ok(parsed_instruments)
1361 }
1362
1363 pub async fn get_wallet(&self) -> Result<BitmexWallet, BitmexHttpError> {
1369 let inner = self.inner.clone();
1370 inner.get_wallet().await
1371 }
1372
1373 pub async fn get_orders(
1379 &self,
1380 params: GetOrderParams,
1381 ) -> Result<Vec<BitmexOrder>, BitmexHttpError> {
1382 let inner = self.inner.clone();
1383 inner.get_orders(params).await
1384 }
1385
1386 fn instrument_from_cache(&self, symbol: Ustr) -> anyhow::Result<InstrumentAny> {
1392 self.get_instrument(&symbol).ok_or_else(|| {
1393 anyhow::anyhow!(
1394 "Instrument {symbol} not found in cache, ensure instruments loaded first"
1395 )
1396 })
1397 }
1398
1399 pub fn get_price_precision(&self, symbol: Ustr) -> anyhow::Result<u8> {
1406 self.instrument_from_cache(symbol)
1407 .map(|instrument| instrument.price_precision())
1408 }
1409
1410 pub async fn get_margin(&self, currency: &str) -> anyhow::Result<BitmexMargin> {
1416 self.inner
1417 .get_margin(currency)
1418 .await
1419 .map_err(|e| anyhow::anyhow!(e))
1420 }
1421
1422 pub async fn get_all_margins(&self) -> anyhow::Result<Vec<BitmexMargin>> {
1428 self.inner
1429 .get_all_margins()
1430 .await
1431 .map_err(|e| anyhow::anyhow!(e))
1432 }
1433
1434 pub async fn request_account_state(
1440 &self,
1441 account_id: AccountId,
1442 ) -> anyhow::Result<AccountState> {
1443 let margins = self
1444 .inner
1445 .get_all_margins()
1446 .await
1447 .map_err(|e| anyhow::anyhow!(e))?;
1448
1449 let ts_init =
1450 UnixNanos::from(chrono::Utc::now().timestamp_nanos_opt().unwrap_or_default() as u64);
1451
1452 let mut balances = Vec::with_capacity(margins.len());
1453 let mut latest_timestamp: Option<chrono::DateTime<chrono::Utc>> = None;
1454
1455 for margin in margins {
1456 if let Some(ts) = margin.timestamp {
1457 latest_timestamp = Some(latest_timestamp.map_or(ts, |prev| prev.max(ts)));
1458 }
1459
1460 let margin_msg = BitmexMarginMsg {
1461 account: margin.account,
1462 currency: margin.currency,
1463 risk_limit: margin.risk_limit,
1464 amount: margin.amount,
1465 prev_realised_pnl: margin.prev_realised_pnl,
1466 gross_comm: margin.gross_comm,
1467 gross_open_cost: margin.gross_open_cost,
1468 gross_open_premium: margin.gross_open_premium,
1469 gross_exec_cost: margin.gross_exec_cost,
1470 gross_mark_value: margin.gross_mark_value,
1471 risk_value: margin.risk_value,
1472 init_margin: margin.init_margin,
1473 maint_margin: margin.maint_margin,
1474 target_excess_margin: margin.target_excess_margin,
1475 realised_pnl: margin.realised_pnl,
1476 unrealised_pnl: margin.unrealised_pnl,
1477 wallet_balance: margin.wallet_balance,
1478 margin_balance: margin.margin_balance,
1479 margin_leverage: margin.margin_leverage,
1480 margin_used_pcnt: margin.margin_used_pcnt,
1481 excess_margin: margin.excess_margin,
1482 available_margin: margin.available_margin,
1483 withdrawable_margin: margin.withdrawable_margin,
1484 maker_fee_discount: None,
1485 taker_fee_discount: None,
1486 timestamp: margin.timestamp.unwrap_or_else(chrono::Utc::now),
1487 foreign_margin_balance: None,
1488 foreign_requirement: None,
1489 };
1490
1491 balances.push(parse_account_balance(&margin_msg));
1492 }
1493
1494 if balances.is_empty() {
1495 anyhow::bail!("No margin data returned from BitMEX");
1496 }
1497
1498 let account_type = AccountType::Margin;
1499 let margins_vec = Vec::new();
1500 let is_reported = true;
1501 let event_id = UUID4::new();
1502
1503 let ts_event = latest_timestamp.map_or(ts_init, |ts| {
1505 UnixNanos::from(ts.timestamp_nanos_opt().unwrap_or_default() as u64)
1506 });
1507
1508 Ok(AccountState::new(
1509 account_id,
1510 account_type,
1511 balances,
1512 margins_vec,
1513 is_reported,
1514 event_id,
1515 ts_event,
1516 ts_init,
1517 None,
1518 ))
1519 }
1520
1521 #[allow(clippy::too_many_arguments)]
1528 pub async fn submit_order(
1529 &self,
1530 instrument_id: InstrumentId,
1531 client_order_id: ClientOrderId,
1532 order_side: OrderSide,
1533 order_type: OrderType,
1534 quantity: Quantity,
1535 time_in_force: TimeInForce,
1536 price: Option<Price>,
1537 trigger_price: Option<Price>,
1538 trigger_type: Option<TriggerType>,
1539 trailing_offset: Option<f64>,
1540 trailing_offset_type: Option<TrailingOffsetType>,
1541 display_qty: Option<Quantity>,
1542 post_only: bool,
1543 reduce_only: bool,
1544 order_list_id: Option<OrderListId>,
1545 contingency_type: Option<ContingencyType>,
1546 peg_price_type: Option<BitmexPegPriceType>,
1547 peg_offset_value: Option<f64>,
1548 ) -> anyhow::Result<OrderStatusReport> {
1549 let instrument = self.instrument_from_cache(instrument_id.symbol.inner())?;
1550
1551 let mut params = super::query::PostOrderParamsBuilder::default();
1552 params.text(NAUTILUS_TRADER);
1553 params.symbol(instrument_id.symbol.as_str());
1554 params.cl_ord_id(client_order_id.as_str());
1555
1556 if order_side == OrderSide::NoOrderSide {
1557 anyhow::bail!("Order side must be Buy or Sell");
1558 }
1559 let side = BitmexSide::from(order_side.as_specified());
1560 params.side(side);
1561
1562 let ord_type = BitmexOrderType::try_from_order_type(order_type)?;
1563 params.ord_type(ord_type);
1564
1565 params.order_qty(quantity_to_u32(&quantity, &instrument));
1566
1567 let tif = BitmexTimeInForce::try_from_time_in_force(time_in_force)?;
1568 params.time_in_force(tif);
1569
1570 if let Some(price) = price {
1571 params.price(price.as_f64());
1572 }
1573
1574 if let Some(trigger_price) = trigger_price {
1575 params.stop_px(trigger_price.as_f64());
1576 }
1577
1578 if let Some(display_qty) = display_qty {
1579 params.display_qty(quantity_to_u32(&display_qty, &instrument));
1580 }
1581
1582 if let Some(order_list_id) = order_list_id {
1583 params.cl_ord_link_id(order_list_id.as_str());
1584 }
1585
1586 let is_trailing_stop = matches!(
1587 order_type,
1588 OrderType::TrailingStopMarket | OrderType::TrailingStopLimit
1589 );
1590
1591 if is_trailing_stop && let Some(offset) = trailing_offset {
1592 if let Some(offset_type) = trailing_offset_type
1593 && offset_type != TrailingOffsetType::Price
1594 {
1595 anyhow::bail!(
1596 "BitMEX only supports PRICE trailing offset type, was {offset_type:?}"
1597 );
1598 }
1599
1600 params.peg_price_type(BitmexPegPriceType::TrailingStopPeg);
1601
1602 let signed_offset = match order_side {
1604 OrderSide::Sell => -offset.abs(),
1605 OrderSide::Buy => offset.abs(),
1606 _ => offset,
1607 };
1608 params.peg_offset_value(signed_offset);
1609 }
1610
1611 if peg_price_type.is_none() && peg_offset_value.is_some() {
1613 anyhow::bail!("`peg_offset_value` requires `peg_price_type`");
1614 }
1615
1616 if let Some(peg_type) = peg_price_type {
1617 if order_type != OrderType::Limit {
1618 anyhow::bail!(
1619 "Pegged orders only supported for LIMIT order type, was {order_type:?}"
1620 );
1621 }
1622 params.ord_type(BitmexOrderType::Pegged);
1623 params.peg_price_type(peg_type);
1624
1625 if let Some(offset) = peg_offset_value {
1626 params.peg_offset_value(offset);
1627 }
1628 }
1629
1630 let mut exec_inst = Vec::new();
1631
1632 if post_only {
1633 exec_inst.push(BitmexExecInstruction::ParticipateDoNotInitiate);
1634 }
1635
1636 if reduce_only {
1637 exec_inst.push(BitmexExecInstruction::ReduceOnly);
1638 }
1639
1640 if (trigger_price.is_some() || is_trailing_stop)
1642 && let Some(trigger_type) = trigger_type
1643 {
1644 match trigger_type {
1645 TriggerType::LastPrice => exec_inst.push(BitmexExecInstruction::LastPrice),
1646 TriggerType::MarkPrice => exec_inst.push(BitmexExecInstruction::MarkPrice),
1647 TriggerType::IndexPrice => exec_inst.push(BitmexExecInstruction::IndexPrice),
1648 _ => {} }
1650 }
1651
1652 if !exec_inst.is_empty() {
1653 params.exec_inst(exec_inst);
1654 }
1655
1656 if let Some(contingency_type) = contingency_type {
1657 let bitmex_contingency = BitmexContingencyType::try_from(contingency_type)?;
1658 params.contingency_type(bitmex_contingency);
1659 }
1660
1661 let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1662
1663 let response = self.inner.place_order(params).await?;
1664
1665 let order: BitmexOrder = serde_json::from_value(response)?;
1666
1667 if order.ord_status == Some(BitmexOrderStatus::Rejected) {
1668 let reason = order
1669 .ord_rej_reason
1670 .map_or_else(|| "No reason provided".to_string(), |r| r.to_string());
1671 anyhow::bail!("Order rejected: {reason}");
1672 }
1673
1674 self.order_type_cache.insert(client_order_id, order_type);
1676
1677 let instrument = self.instrument_from_cache(instrument_id.symbol.inner())?;
1678 let ts_init = self.generate_ts_init();
1679
1680 parse_order_status_report(&order, &instrument, &self.order_type_cache, ts_init)
1681 }
1682
1683 pub async fn cancel_order(
1693 &self,
1694 instrument_id: InstrumentId,
1695 client_order_id: Option<ClientOrderId>,
1696 venue_order_id: Option<VenueOrderId>,
1697 ) -> anyhow::Result<OrderStatusReport> {
1698 let mut params = super::query::DeleteOrderParamsBuilder::default();
1699 params.text(NAUTILUS_TRADER);
1700
1701 if let Some(venue_order_id) = venue_order_id {
1702 params.order_id(vec![venue_order_id.as_str().to_string()]);
1703 } else if let Some(client_order_id) = client_order_id {
1704 params.cl_ord_id(vec![client_order_id.as_str().to_string()]);
1705 } else {
1706 anyhow::bail!("Either client_order_id or venue_order_id must be provided");
1707 }
1708
1709 let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1710
1711 let response = self.inner.cancel_orders(params).await?;
1712
1713 let orders: Vec<BitmexOrder> = serde_json::from_value(response)?;
1714 let order = orders
1715 .into_iter()
1716 .next()
1717 .ok_or_else(|| anyhow::anyhow!("No order returned in cancel response"))?;
1718
1719 let instrument = self.instrument_from_cache(instrument_id.symbol.inner())?;
1720 let ts_init = self.generate_ts_init();
1721
1722 parse_order_status_report(&order, &instrument, &self.order_type_cache, ts_init)
1723 }
1724
1725 pub async fn cancel_orders(
1735 &self,
1736 instrument_id: InstrumentId,
1737 client_order_ids: Option<Vec<ClientOrderId>>,
1738 venue_order_ids: Option<Vec<VenueOrderId>>,
1739 ) -> anyhow::Result<Vec<OrderStatusReport>> {
1740 let mut params = super::query::DeleteOrderParamsBuilder::default();
1741 params.text(NAUTILUS_TRADER);
1742
1743 if let Some(venue_order_ids) = venue_order_ids {
1746 if venue_order_ids.is_empty() {
1747 anyhow::bail!("venue_order_ids cannot be empty");
1748 }
1749 params.order_id(
1750 venue_order_ids
1751 .iter()
1752 .map(|id| id.to_string())
1753 .collect::<Vec<_>>(),
1754 );
1755 } else if let Some(client_order_ids) = client_order_ids {
1756 if client_order_ids.is_empty() {
1757 anyhow::bail!("client_order_ids cannot be empty");
1758 }
1759 params.cl_ord_id(
1760 client_order_ids
1761 .iter()
1762 .map(|id| id.to_string())
1763 .collect::<Vec<_>>(),
1764 );
1765 } else {
1766 anyhow::bail!("Either client_order_ids or venue_order_ids must be provided");
1767 }
1768
1769 let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1770
1771 let response = self.inner.cancel_orders(params).await?;
1772
1773 let orders: Vec<BitmexOrder> = serde_json::from_value(response)?;
1774
1775 let ts_init = self.generate_ts_init();
1776 let instrument = self.instrument_from_cache(instrument_id.symbol.inner())?;
1777
1778 let mut reports = Vec::new();
1779
1780 for order in orders {
1781 reports.push(parse_order_status_report(
1782 &order,
1783 &instrument,
1784 &self.order_type_cache,
1785 ts_init,
1786 )?);
1787 }
1788
1789 Self::populate_linked_order_ids(&mut reports);
1790
1791 Ok(reports)
1792 }
1793
1794 pub async fn cancel_all_orders(
1804 &self,
1805 instrument_id: InstrumentId,
1806 order_side: Option<OrderSide>,
1807 ) -> anyhow::Result<Vec<OrderStatusReport>> {
1808 let mut params = DeleteAllOrdersParamsBuilder::default();
1809 params.text(NAUTILUS_TRADER);
1810 params.symbol(instrument_id.symbol.as_str());
1811
1812 if let Some(side) = order_side {
1813 if side == OrderSide::NoOrderSide {
1814 log::debug!("Ignoring NoOrderSide filter for cancel_all_orders on {instrument_id}",);
1815 } else {
1816 let side = BitmexSide::from(side.as_specified());
1817 params.filter(serde_json::json!({
1818 "side": side
1819 }));
1820 }
1821 }
1822
1823 let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1824
1825 let response = self.inner.cancel_all_orders(params).await?;
1826
1827 let orders: Vec<BitmexOrder> = serde_json::from_value(response)?;
1828
1829 let instrument = self.instrument_from_cache(instrument_id.symbol.inner())?;
1830 let ts_init = self.generate_ts_init();
1831
1832 let mut reports = Vec::new();
1833
1834 for order in orders {
1835 reports.push(parse_order_status_report(
1836 &order,
1837 &instrument,
1838 &self.order_type_cache,
1839 ts_init,
1840 )?);
1841 }
1842
1843 Self::populate_linked_order_ids(&mut reports);
1844
1845 Ok(reports)
1846 }
1847
1848 pub async fn modify_order(
1859 &self,
1860 instrument_id: InstrumentId,
1861 client_order_id: Option<ClientOrderId>,
1862 venue_order_id: Option<VenueOrderId>,
1863 quantity: Option<Quantity>,
1864 price: Option<Price>,
1865 trigger_price: Option<Price>,
1866 ) -> anyhow::Result<OrderStatusReport> {
1867 let mut params = PutOrderParamsBuilder::default();
1868 params.text(NAUTILUS_TRADER);
1869
1870 if let Some(venue_order_id) = venue_order_id {
1872 params.order_id(venue_order_id.as_str());
1873 } else if let Some(client_order_id) = client_order_id {
1874 params.orig_cl_ord_id(client_order_id.as_str());
1875 } else {
1876 anyhow::bail!("Either client_order_id or venue_order_id must be provided");
1877 }
1878
1879 if let Some(quantity) = quantity {
1880 let instrument = self.instrument_from_cache(instrument_id.symbol.inner())?;
1881 params.order_qty(quantity_to_u32(&quantity, &instrument));
1882 }
1883
1884 if let Some(price) = price {
1885 params.price(price.as_f64());
1886 }
1887
1888 if let Some(trigger_price) = trigger_price {
1889 params.stop_px(trigger_price.as_f64());
1890 }
1891
1892 let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1893
1894 let response = self.inner.amend_order(params).await?;
1895
1896 let order: BitmexOrder = serde_json::from_value(response)?;
1897
1898 if order.ord_status == Some(BitmexOrderStatus::Rejected) {
1899 let reason = order
1900 .ord_rej_reason
1901 .map_or_else(|| "No reason provided".to_string(), |r| r.to_string());
1902 anyhow::bail!("Order modification rejected: {reason}");
1903 }
1904
1905 let instrument = self.instrument_from_cache(instrument_id.symbol.inner())?;
1906 let ts_init = self.generate_ts_init();
1907
1908 parse_order_status_report(&order, &instrument, &self.order_type_cache, ts_init)
1909 }
1910
1911 pub async fn query_order(
1920 &self,
1921 instrument_id: InstrumentId,
1922 client_order_id: Option<ClientOrderId>,
1923 venue_order_id: Option<VenueOrderId>,
1924 ) -> anyhow::Result<Option<OrderStatusReport>> {
1925 let mut params = GetOrderParamsBuilder::default();
1926
1927 let filter_json = if let Some(client_order_id) = client_order_id {
1928 serde_json::json!({
1929 "clOrdID": client_order_id.to_string()
1930 })
1931 } else if let Some(venue_order_id) = venue_order_id {
1932 serde_json::json!({
1933 "orderID": venue_order_id.to_string()
1934 })
1935 } else {
1936 anyhow::bail!("Either client_order_id or venue_order_id must be provided");
1937 };
1938
1939 params.filter(filter_json);
1940 params.count(1); let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1943
1944 let response = self.inner.get_orders(params).await?;
1945
1946 if response.is_empty() {
1947 return Ok(None);
1948 }
1949
1950 let order = &response[0];
1951
1952 let instrument = self.instrument_from_cache(instrument_id.symbol.inner())?;
1953 let ts_init = self.generate_ts_init();
1954
1955 let report =
1956 parse_order_status_report(order, &instrument, &self.order_type_cache, ts_init)?;
1957
1958 Ok(Some(report))
1959 }
1960
1961 pub async fn request_order_status_report(
1970 &self,
1971 instrument_id: InstrumentId,
1972 client_order_id: Option<ClientOrderId>,
1973 venue_order_id: Option<VenueOrderId>,
1974 ) -> anyhow::Result<OrderStatusReport> {
1975 if venue_order_id.is_none() && client_order_id.is_none() {
1976 anyhow::bail!("Either venue_order_id or client_order_id must be provided");
1977 }
1978
1979 let mut params = GetOrderParamsBuilder::default();
1980 params.symbol(instrument_id.symbol.as_str());
1981
1982 if let Some(venue_order_id) = venue_order_id {
1983 params.filter(serde_json::json!({
1984 "orderID": venue_order_id.as_str()
1985 }));
1986 } else if let Some(client_order_id) = client_order_id {
1987 params.filter(serde_json::json!({
1988 "clOrdID": client_order_id.as_str()
1989 }));
1990 }
1991
1992 params.count(1i32);
1993 let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1994
1995 let response = self.inner.get_orders(params).await?;
1996
1997 let order = response
1998 .into_iter()
1999 .next()
2000 .ok_or_else(|| anyhow::anyhow!("Order not found"))?;
2001
2002 let instrument = self.instrument_from_cache(instrument_id.symbol.inner())?;
2003 let ts_init = self.generate_ts_init();
2004
2005 parse_order_status_report(&order, &instrument, &self.order_type_cache, ts_init)
2006 }
2007
2008 pub async fn request_order_status_reports(
2017 &self,
2018 instrument_id: Option<InstrumentId>,
2019 open_only: bool,
2020 start: Option<DateTime<Utc>>,
2021 end: Option<DateTime<Utc>>,
2022 limit: Option<u32>,
2023 ) -> anyhow::Result<Vec<OrderStatusReport>> {
2024 if let (Some(start), Some(end)) = (start, end) {
2025 anyhow::ensure!(
2026 start < end,
2027 "Invalid time range: start={start:?} end={end:?}",
2028 );
2029 }
2030
2031 let mut params = GetOrderParamsBuilder::default();
2032
2033 if let Some(instrument_id) = &instrument_id {
2034 params.symbol(instrument_id.symbol.as_str());
2035 }
2036
2037 if open_only {
2038 params.filter(serde_json::json!({
2039 "open": true
2040 }));
2041 }
2042
2043 if let Some(start) = start {
2044 params.start_time(start);
2045 }
2046
2047 if let Some(end) = end {
2048 params.end_time(end);
2049 }
2050
2051 if let Some(limit) = limit {
2052 params.count(limit as i32);
2053 } else {
2054 params.count(500); }
2056
2057 params.reverse(true); let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
2060
2061 let response = self.inner.get_orders(params).await?;
2062
2063 let ts_init = self.generate_ts_init();
2064
2065 let mut reports = Vec::new();
2066
2067 for order in response {
2068 if let Some(start) = start {
2069 match order.timestamp {
2070 Some(timestamp) if timestamp < start => continue,
2071 Some(_) => {}
2072 None => {
2073 log::debug!("Skipping order report without timestamp for bounded query");
2074 continue;
2075 }
2076 }
2077 }
2078
2079 if let Some(end) = end {
2080 match order.timestamp {
2081 Some(timestamp) if timestamp > end => continue,
2082 Some(_) => {}
2083 None => {
2084 log::debug!("Skipping order report without timestamp for bounded query");
2085 continue;
2086 }
2087 }
2088 }
2089
2090 let Some(symbol) = order.symbol else {
2092 log::warn!("Order response missing symbol, skipping");
2093 continue;
2094 };
2095
2096 let Ok(instrument) = self.instrument_from_cache(symbol) else {
2097 log::debug!("Skipping order report for instrument not in cache: symbol={symbol}");
2098 continue;
2099 };
2100
2101 match parse_order_status_report(&order, &instrument, &self.order_type_cache, ts_init) {
2102 Ok(report) => reports.push(report),
2103 Err(e) => log::error!("Failed to parse order status report: {e}"),
2104 }
2105 }
2106
2107 Self::populate_linked_order_ids(&mut reports);
2108
2109 Ok(reports)
2110 }
2111
2112 pub async fn request_trades(
2118 &self,
2119 instrument_id: InstrumentId,
2120 start: Option<DateTime<Utc>>,
2121 end: Option<DateTime<Utc>>,
2122 limit: Option<u32>,
2123 ) -> anyhow::Result<Vec<TradeTick>> {
2124 let mut params = GetTradeParamsBuilder::default();
2125 params.symbol(instrument_id.symbol.as_str());
2126
2127 if let Some(start) = start {
2128 params.start_time(start);
2129 }
2130
2131 if let Some(end) = end {
2132 params.end_time(end);
2133 }
2134
2135 if let (Some(start), Some(end)) = (start, end) {
2136 anyhow::ensure!(
2137 start < end,
2138 "Invalid time range: start={start:?} end={end:?}",
2139 );
2140 }
2141
2142 if let Some(limit) = limit {
2143 let clamped_limit = limit.min(1000);
2144 if limit > 1000 {
2145 log::warn!(
2146 "BitMEX trade request limit exceeds venue maximum; clamping: limit={limit}, clamped_limit={clamped_limit}",
2147 );
2148 }
2149 params.count(i32::try_from(clamped_limit).unwrap_or(1000));
2150 }
2151 params.reverse(false);
2152 let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
2153
2154 let response = self.inner.get_trades(params).await?;
2155
2156 let ts_init = self.generate_ts_init();
2157
2158 let mut parsed_trades = Vec::new();
2159
2160 for trade in response {
2161 if let Some(start) = start
2162 && trade.timestamp < start
2163 {
2164 continue;
2165 }
2166
2167 if let Some(end) = end
2168 && trade.timestamp > end
2169 {
2170 continue;
2171 }
2172
2173 let Some(instrument) = self.get_instrument(&trade.symbol) else {
2174 log::error!(
2175 "Instrument {} not found in cache, skipping trade",
2176 trade.symbol
2177 );
2178 continue;
2179 };
2180
2181 match parse_trade(trade, &instrument, ts_init) {
2182 Ok(trade) => parsed_trades.push(trade),
2183 Err(e) => log::error!("Failed to parse trade: {e}"),
2184 }
2185 }
2186
2187 Ok(parsed_trades)
2188 }
2189
2190 pub async fn request_bars(
2197 &self,
2198 mut bar_type: BarType,
2199 start: Option<DateTime<Utc>>,
2200 end: Option<DateTime<Utc>>,
2201 limit: Option<u32>,
2202 partial: bool,
2203 ) -> anyhow::Result<Vec<Bar>> {
2204 bar_type = bar_type.standard();
2205
2206 anyhow::ensure!(
2207 bar_type.aggregation_source() == AggregationSource::External,
2208 "Only EXTERNAL aggregation bars are supported"
2209 );
2210 anyhow::ensure!(
2211 bar_type.spec().price_type == PriceType::Last,
2212 "Only LAST price type bars are supported"
2213 );
2214
2215 if let (Some(start), Some(end)) = (start, end) {
2216 anyhow::ensure!(
2217 start < end,
2218 "Invalid time range: start={start:?} end={end:?}"
2219 );
2220 }
2221
2222 let spec = bar_type.spec();
2223 let bin_size = match (spec.aggregation, spec.step.get()) {
2224 (BarAggregation::Minute, 1) => "1m",
2225 (BarAggregation::Minute, 5) => "5m",
2226 (BarAggregation::Hour, 1) => "1h",
2227 (BarAggregation::Day, 1) => "1d",
2228 _ => anyhow::bail!(
2229 "BitMEX does not support {}-{:?}-{:?} bars",
2230 spec.step.get(),
2231 spec.aggregation,
2232 spec.price_type,
2233 ),
2234 };
2235
2236 let instrument_id = bar_type.instrument_id();
2237 let instrument = self.instrument_from_cache(instrument_id.symbol.inner())?;
2238
2239 let mut params = GetTradeBucketedParamsBuilder::default();
2240 params.symbol(instrument_id.symbol.as_str());
2241 params.bin_size(bin_size);
2242
2243 if partial {
2244 params.partial(true);
2245 }
2246
2247 if let Some(start) = start {
2248 params.start_time(start);
2249 }
2250
2251 if let Some(end) = end {
2252 params.end_time(end);
2253 }
2254
2255 if let Some(limit) = limit {
2256 let clamped_limit = limit.min(1000);
2257 if limit > 1000 {
2258 log::warn!(
2259 "BitMEX bar request limit exceeds venue maximum; clamping: limit={limit}, clamped_limit={clamped_limit}",
2260 );
2261 }
2262 params.count(i32::try_from(clamped_limit).unwrap_or(1000));
2263 }
2264 params.reverse(false);
2265 let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
2266
2267 let response = self.inner.get_trade_bucketed(params).await?;
2268 let ts_init = self.generate_ts_init();
2269 let mut bars = Vec::new();
2270
2271 for bin in response {
2272 if let Some(start) = start
2273 && bin.timestamp < start
2274 {
2275 continue;
2276 }
2277
2278 if let Some(end) = end
2279 && bin.timestamp > end
2280 {
2281 continue;
2282 }
2283
2284 if bin.symbol != instrument_id.symbol.inner() {
2285 log::warn!(
2286 "Skipping trade bin for unexpected symbol: symbol={}, expected={}",
2287 bin.symbol,
2288 instrument_id.symbol,
2289 );
2290 continue;
2291 }
2292
2293 match parse_trade_bin(bin, &instrument, &bar_type, ts_init) {
2294 Ok(bar) => bars.push(bar),
2295 Err(e) => log::warn!("Failed to parse trade bin: {e}"),
2296 }
2297 }
2298
2299 Ok(bars)
2300 }
2301
2302 pub async fn request_fill_reports(
2308 &self,
2309 instrument_id: Option<InstrumentId>,
2310 start: Option<DateTime<Utc>>,
2311 end: Option<DateTime<Utc>>,
2312 limit: Option<u32>,
2313 ) -> anyhow::Result<Vec<FillReport>> {
2314 if let (Some(start), Some(end)) = (start, end) {
2315 anyhow::ensure!(
2316 start < end,
2317 "Invalid time range: start={start:?} end={end:?}",
2318 );
2319 }
2320
2321 let mut params = GetExecutionParamsBuilder::default();
2322
2323 if let Some(instrument_id) = instrument_id {
2324 params.symbol(instrument_id.symbol.as_str());
2325 }
2326
2327 if let Some(start) = start {
2328 params.start_time(start);
2329 }
2330
2331 if let Some(end) = end {
2332 params.end_time(end);
2333 }
2334
2335 if let Some(limit) = limit {
2336 params.count(limit as i32);
2337 } else {
2338 params.count(500); }
2340 params.reverse(true); let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
2343
2344 let response = self.inner.get_executions(params).await?;
2345
2346 let ts_init = self.generate_ts_init();
2347
2348 let mut reports = Vec::new();
2349
2350 for exec in response {
2351 if let Some(start) = start {
2352 match exec.transact_time {
2353 Some(timestamp) if timestamp < start => continue,
2354 Some(_) => {}
2355 None => {
2356 log::debug!("Skipping fill report without transact_time for bounded query");
2357 continue;
2358 }
2359 }
2360 }
2361
2362 if let Some(end) = end {
2363 match exec.transact_time {
2364 Some(timestamp) if timestamp > end => continue,
2365 Some(_) => {}
2366 None => {
2367 log::debug!("Skipping fill report without transact_time for bounded query");
2368 continue;
2369 }
2370 }
2371 }
2372
2373 let Some(symbol) = exec.symbol else {
2375 log::debug!("Skipping execution without symbol: {:?}", exec.exec_type);
2376 continue;
2377 };
2378 let symbol_str = symbol.to_string();
2379
2380 let instrument = match self.instrument_from_cache(symbol) {
2381 Ok(instrument) => instrument,
2382 Err(e) => {
2383 log::error!(
2384 "Instrument not found in cache for execution parsing: symbol={symbol_str}, {e}"
2385 );
2386 continue;
2387 }
2388 };
2389
2390 match parse_fill_report(exec, &instrument, ts_init) {
2391 Ok(report) => reports.push(report),
2392 Err(e) => {
2393 let error_msg = e.to_string();
2395 if error_msg.starts_with("Skipping non-trade execution")
2396 || error_msg.starts_with("Skipping execution without order_id")
2397 {
2398 log::debug!("{e}");
2399 } else {
2400 log::error!("Failed to parse fill report: {e}");
2401 }
2402 }
2403 }
2404 }
2405
2406 Ok(reports)
2407 }
2408
2409 pub async fn request_position_status_reports(
2415 &self,
2416 ) -> anyhow::Result<Vec<PositionStatusReport>> {
2417 let params = GetPositionParamsBuilder::default()
2418 .count(500) .build()
2420 .map_err(|e| anyhow::anyhow!(e))?;
2421
2422 let response = self.inner.get_positions(params).await?;
2423
2424 let ts_init = self.generate_ts_init();
2425
2426 let mut reports = Vec::new();
2427
2428 for pos in response {
2429 let symbol = pos.symbol;
2430 let instrument = match self.instrument_from_cache(symbol) {
2431 Ok(instrument) => instrument,
2432 Err(e) => {
2433 log::error!(
2434 "Instrument not found in cache for position parsing: symbol={}, {e}",
2435 pos.symbol.as_str(),
2436 );
2437 continue;
2438 }
2439 };
2440
2441 match parse_position_report(pos, &instrument, ts_init) {
2442 Ok(report) => reports.push(report),
2443 Err(e) => log::error!("Failed to parse position report: {e}"),
2444 }
2445 }
2446
2447 Ok(reports)
2448 }
2449
2450 pub async fn update_position_leverage(
2458 &self,
2459 symbol: &str,
2460 leverage: f64,
2461 ) -> anyhow::Result<PositionStatusReport> {
2462 let params = PostPositionLeverageParams {
2463 symbol: symbol.to_string(),
2464 leverage,
2465 target_account_id: None,
2466 };
2467
2468 let response = self.inner.update_position_leverage(params).await?;
2469
2470 let instrument = self.instrument_from_cache(Ustr::from(symbol))?;
2471 let ts_init = self.generate_ts_init();
2472
2473 parse_position_report(response, &instrument, ts_init)
2474 }
2475}
2476
2477#[cfg(test)]
2478mod tests {
2479 use nautilus_core::UUID4;
2480 use nautilus_model::enums::OrderStatus;
2481 use rstest::rstest;
2482 use serde_json::json;
2483
2484 use super::*;
2485
2486 fn build_report(
2487 client_order_id: &str,
2488 venue_order_id: &str,
2489 contingency_type: ContingencyType,
2490 order_list_id: Option<&str>,
2491 ) -> OrderStatusReport {
2492 let mut report = OrderStatusReport::new(
2493 AccountId::from("BITMEX-1"),
2494 InstrumentId::from("XBTUSD.BITMEX"),
2495 Some(ClientOrderId::from(client_order_id)),
2496 VenueOrderId::from(venue_order_id),
2497 OrderSide::Buy,
2498 OrderType::Limit,
2499 TimeInForce::Gtc,
2500 OrderStatus::Accepted,
2501 Quantity::new(100.0, 0),
2502 Quantity::default(),
2503 UnixNanos::from(1_u64),
2504 UnixNanos::from(1_u64),
2505 UnixNanos::from(1_u64),
2506 Some(UUID4::new()),
2507 );
2508
2509 if let Some(id) = order_list_id {
2510 report = report.with_order_list_id(OrderListId::from(id));
2511 }
2512
2513 report.with_contingency_type(contingency_type)
2514 }
2515
2516 #[rstest]
2517 fn test_sign_request_generates_correct_headers() {
2518 let client = BitmexRawHttpClient::with_credentials(
2519 "test_api_key".to_string(),
2520 "test_api_secret".to_string(),
2521 "http://localhost:8080".to_string(),
2522 Some(60),
2523 None, None, None, None, None, None, None, )
2531 .expect("Failed to create test client");
2532
2533 let headers = client
2534 .sign_request(&Method::GET, "/api/v1/order", None)
2535 .unwrap();
2536
2537 assert!(headers.contains_key("api-key"));
2538 assert!(headers.contains_key("api-signature"));
2539 assert!(headers.contains_key("api-expires"));
2540 assert_eq!(headers.get("api-key").unwrap(), "test_api_key");
2541 }
2542
2543 #[rstest]
2544 fn test_sign_request_with_body() {
2545 let client = BitmexRawHttpClient::with_credentials(
2546 "test_api_key".to_string(),
2547 "test_api_secret".to_string(),
2548 "http://localhost:8080".to_string(),
2549 Some(60),
2550 None, None, None, None, None, None, None, )
2558 .expect("Failed to create test client");
2559
2560 let body = json!({"symbol": "XBTUSD", "orderQty": 100});
2561 let body_bytes = serde_json::to_vec(&body).unwrap();
2562
2563 let headers_without_body = client
2564 .sign_request(&Method::POST, "/api/v1/order", None)
2565 .unwrap();
2566 let headers_with_body = client
2567 .sign_request(&Method::POST, "/api/v1/order", Some(&body_bytes))
2568 .unwrap();
2569
2570 assert_ne!(
2572 headers_without_body.get("api-signature").unwrap(),
2573 headers_with_body.get("api-signature").unwrap()
2574 );
2575 }
2576
2577 #[rstest]
2578 fn test_sign_request_uses_custom_recv_window() {
2579 let client_default = BitmexRawHttpClient::with_credentials(
2580 "test_api_key".to_string(),
2581 "test_api_secret".to_string(),
2582 "http://localhost:8080".to_string(),
2583 Some(60),
2584 None,
2585 None,
2586 None,
2587 None, None, None, None, )
2592 .expect("Failed to create test client");
2593
2594 let client_custom = BitmexRawHttpClient::with_credentials(
2595 "test_api_key".to_string(),
2596 "test_api_secret".to_string(),
2597 "http://localhost:8080".to_string(),
2598 Some(60),
2599 None,
2600 None,
2601 None,
2602 Some(30_000), None, None, None, )
2607 .expect("Failed to create test client");
2608
2609 let headers_default = client_default
2610 .sign_request(&Method::GET, "/api/v1/order", None)
2611 .unwrap();
2612 let headers_custom = client_custom
2613 .sign_request(&Method::GET, "/api/v1/order", None)
2614 .unwrap();
2615
2616 let expires_default: i64 = headers_default.get("api-expires").unwrap().parse().unwrap();
2618 let expires_custom: i64 = headers_custom.get("api-expires").unwrap().parse().unwrap();
2619
2620 let now = Utc::now().timestamp();
2622 assert!(expires_default > now);
2623 assert!(expires_custom > now);
2624
2625 assert!(expires_custom > expires_default);
2627
2628 let diff = expires_custom - expires_default;
2631 assert!((18..=25).contains(&diff));
2632 }
2633
2634 #[rstest]
2635 fn test_populate_linked_order_ids_from_order_list() {
2636 let base = "O-20250922-002219-001-000";
2637 let entry = format!("{base}-1");
2638 let stop = format!("{base}-2");
2639 let take = format!("{base}-3");
2640
2641 let mut reports = vec![
2642 build_report(&entry, "V-1", ContingencyType::Oto, Some("OL-1")),
2643 build_report(&stop, "V-2", ContingencyType::Ouo, Some("OL-1")),
2644 build_report(&take, "V-3", ContingencyType::Ouo, Some("OL-1")),
2645 ];
2646
2647 BitmexHttpClient::populate_linked_order_ids(&mut reports);
2648
2649 assert_eq!(
2650 reports[0].linked_order_ids,
2651 Some(vec![
2652 ClientOrderId::from(stop.as_str()),
2653 ClientOrderId::from(take.as_str()),
2654 ]),
2655 );
2656 assert_eq!(
2657 reports[1].linked_order_ids,
2658 Some(vec![
2659 ClientOrderId::from(entry.as_str()),
2660 ClientOrderId::from(take.as_str()),
2661 ]),
2662 );
2663 assert_eq!(
2664 reports[2].linked_order_ids,
2665 Some(vec![
2666 ClientOrderId::from(entry.as_str()),
2667 ClientOrderId::from(stop.as_str()),
2668 ]),
2669 );
2670 }
2671
2672 #[rstest]
2673 fn test_populate_linked_order_ids_from_id_prefix() {
2674 let base = "O-20250922-002220-001-000";
2675 let entry = format!("{base}-1");
2676 let stop = format!("{base}-2");
2677 let take = format!("{base}-3");
2678
2679 let mut reports = vec![
2680 build_report(&entry, "V-1", ContingencyType::Oto, None),
2681 build_report(&stop, "V-2", ContingencyType::Ouo, None),
2682 build_report(&take, "V-3", ContingencyType::Ouo, None),
2683 ];
2684
2685 BitmexHttpClient::populate_linked_order_ids(&mut reports);
2686
2687 assert_eq!(
2688 reports[0].linked_order_ids,
2689 Some(vec![
2690 ClientOrderId::from(stop.as_str()),
2691 ClientOrderId::from(take.as_str()),
2692 ]),
2693 );
2694 assert_eq!(
2695 reports[1].linked_order_ids,
2696 Some(vec![
2697 ClientOrderId::from(entry.as_str()),
2698 ClientOrderId::from(take.as_str()),
2699 ]),
2700 );
2701 assert_eq!(
2702 reports[2].linked_order_ids,
2703 Some(vec![
2704 ClientOrderId::from(entry.as_str()),
2705 ClientOrderId::from(stop.as_str()),
2706 ]),
2707 );
2708 }
2709
2710 #[rstest]
2711 fn test_populate_linked_order_ids_respects_non_contingent_orders() {
2712 let base = "O-20250922-002221-001-000";
2713 let entry = format!("{base}-1");
2714 let passive = format!("{base}-2");
2715
2716 let mut reports = vec![
2717 build_report(&entry, "V-1", ContingencyType::NoContingency, None),
2718 build_report(&passive, "V-2", ContingencyType::Ouo, None),
2719 ];
2720
2721 BitmexHttpClient::populate_linked_order_ids(&mut reports);
2722
2723 assert!(reports[0].linked_order_ids.is_none());
2725
2726 assert!(reports[1].linked_order_ids.is_none());
2728 assert_eq!(reports[1].contingency_type, ContingencyType::NoContingency);
2729 }
2730}