1use std::{
54 collections::HashMap,
55 fmt::Debug,
56 num::NonZeroU32,
57 sync::{Arc, LazyLock},
58};
59
60use chrono::{DateTime, Utc};
61use nautilus_core::{
62 UnixNanos,
63 consts::NAUTILUS_USER_AGENT,
64 time::{AtomicTime, get_atomic_clock_realtime},
65};
66use nautilus_model::{
67 data::{Bar, BarType, BookOrder, OrderBookDelta, OrderBookDeltas, TradeTick},
68 enums::{
69 AggregationSource, BarAggregation, BookAction, OrderSide as NautilusOrderSide, PriceType,
70 RecordFlag,
71 },
72 events::AccountState,
73 identifiers::{AccountId, InstrumentId},
74 instruments::{Instrument, InstrumentAny},
75 reports::{FillReport, OrderStatusReport, PositionStatusReport},
76 types::{Price, Quantity},
77};
78use nautilus_network::{
79 http::{HttpClient, Method, USER_AGENT},
80 ratelimiter::quota::Quota,
81 retry::{RetryConfig, RetryManager},
82};
83use rust_decimal::Decimal;
84use serde::{Deserialize, Serialize, de::DeserializeOwned};
85use tokio_util::sync::CancellationToken;
86
87use super::error::DydxHttpError;
88use crate::{
89 common::{
90 consts::{DYDX_HTTP_URL, DYDX_TESTNET_HTTP_URL},
91 enums::DydxCandleResolution,
92 instrument_cache::InstrumentCache,
93 parse::extract_raw_symbol,
94 },
95 http::parse::{parse_account_state_from_http, parse_instrument_any},
96};
97
98const DYDX_MAX_BARS_PER_REQUEST: u32 = 1_000;
100
101fn bar_type_to_resolution(bar_type: &BarType) -> anyhow::Result<DydxCandleResolution> {
102 if bar_type.aggregation_source() != AggregationSource::External {
103 anyhow::bail!(
104 "dYdX only supports EXTERNAL aggregation, was {:?}",
105 bar_type.aggregation_source()
106 );
107 }
108
109 let spec = bar_type.spec();
110 if spec.price_type != PriceType::Last {
111 anyhow::bail!(
112 "dYdX only supports LAST price type, was {:?}",
113 spec.price_type
114 );
115 }
116
117 DydxCandleResolution::from_bar_spec(&spec)
118}
119
120pub static DYDX_REST_QUOTA: LazyLock<Quota> = LazyLock::new(|| {
126 Quota::per_second(NonZeroU32::new(10).expect("non-zero")).expect("valid constant")
127});
128
129#[derive(Debug, Serialize, Deserialize)]
134pub struct DydxResponse<T> {
135 pub data: T,
137}
138
139pub struct DydxRawHttpClient {
149 base_url: String,
150 client: HttpClient,
151 retry_manager: RetryManager<DydxHttpError>,
152 cancellation_token: CancellationToken,
153 is_testnet: bool,
154}
155
156impl Default for DydxRawHttpClient {
157 fn default() -> Self {
158 Self::new(None, Some(60), None, false, None)
159 .expect("Failed to create default DydxRawHttpClient")
160 }
161}
162
163impl Debug for DydxRawHttpClient {
164 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
165 f.debug_struct(stringify!(DydxRawHttpClient))
166 .field("base_url", &self.base_url)
167 .field("is_testnet", &self.is_testnet)
168 .finish_non_exhaustive()
169 }
170}
171
172impl DydxRawHttpClient {
173 pub fn cancel_all_requests(&self) {
175 self.cancellation_token.cancel();
176 }
177
178 pub fn cancellation_token(&self) -> &CancellationToken {
180 &self.cancellation_token
181 }
182
183 pub fn new(
192 base_url: Option<String>,
193 timeout_secs: Option<u64>,
194 proxy_url: Option<String>,
195 is_testnet: bool,
196 retry_config: Option<RetryConfig>,
197 ) -> anyhow::Result<Self> {
198 let base_url = if is_testnet {
199 base_url.unwrap_or_else(|| DYDX_TESTNET_HTTP_URL.to_string())
200 } else {
201 base_url.unwrap_or_else(|| DYDX_HTTP_URL.to_string())
202 };
203
204 let retry_manager = RetryManager::new(retry_config.unwrap_or_default());
205
206 let mut headers = HashMap::new();
207 headers.insert(USER_AGENT.to_string(), NAUTILUS_USER_AGENT.to_string());
208
209 let client = HttpClient::new(
210 headers,
211 vec![], vec![], Some(*DYDX_REST_QUOTA),
214 timeout_secs,
215 proxy_url,
216 )
217 .map_err(|e| {
218 DydxHttpError::ValidationError(format!("Failed to create HTTP client: {e}"))
219 })?;
220
221 Ok(Self {
222 base_url,
223 client,
224 retry_manager,
225 cancellation_token: CancellationToken::new(),
226 is_testnet,
227 })
228 }
229
230 #[must_use]
232 pub const fn is_testnet(&self) -> bool {
233 self.is_testnet
234 }
235
236 #[must_use]
238 pub fn base_url(&self) -> &str {
239 &self.base_url
240 }
241
242 pub async fn send_request<T>(
254 &self,
255 method: Method,
256 endpoint: &str,
257 query_params: Option<&str>,
258 ) -> Result<T, DydxHttpError>
259 where
260 T: DeserializeOwned,
261 {
262 let url = if let Some(params) = query_params {
263 format!("{}{endpoint}?{params}", self.base_url)
264 } else {
265 format!("{}{endpoint}", self.base_url)
266 };
267
268 let operation = || async {
269 let request = self
270 .client
271 .request_with_ustr_keys(
272 method.clone(),
273 url.clone(),
274 None, None, None, None, None, )
280 .await
281 .map_err(|e| DydxHttpError::HttpClientError(e.to_string()))?;
282
283 if !request.status.is_success() {
284 return Err(DydxHttpError::HttpStatus {
285 status: request.status.as_u16(),
286 message: String::from_utf8_lossy(&request.body).to_string(),
287 });
288 }
289
290 Ok(request)
291 };
292
293 let should_retry = |error: &DydxHttpError| -> bool {
298 match error {
299 DydxHttpError::HttpClientError(_) => true,
300 DydxHttpError::HttpStatus { status, .. } => *status == 429 || *status >= 500,
301 _ => false,
302 }
303 };
304
305 let create_error = |msg: String| -> DydxHttpError {
306 if msg == "canceled" {
307 DydxHttpError::Canceled("Adapter disconnecting or shutting down".to_string())
308 } else if msg.contains("Timed out") {
309 DydxHttpError::HttpClientError(msg)
311 } else {
312 DydxHttpError::ValidationError(msg)
313 }
314 };
315
316 let response = self
317 .retry_manager
318 .execute_with_retry_with_cancel(
319 endpoint,
320 operation,
321 should_retry,
322 create_error,
323 &self.cancellation_token,
324 )
325 .await?;
326
327 serde_json::from_slice(&response.body).map_err(|e| DydxHttpError::Deserialization {
328 error: e.to_string(),
329 body: String::from_utf8_lossy(&response.body).to_string(),
330 })
331 }
332
333 pub async fn send_post_request<T, B>(
346 &self,
347 endpoint: &str,
348 body: &B,
349 ) -> Result<T, DydxHttpError>
350 where
351 T: DeserializeOwned,
352 B: Serialize,
353 {
354 let url = format!("{}{endpoint}", self.base_url);
355
356 let body_bytes = serde_json::to_vec(body).map_err(|e| DydxHttpError::Serialization {
357 error: e.to_string(),
358 })?;
359
360 let operation = || async {
361 let request = self
362 .client
363 .request_with_ustr_keys(
364 Method::POST,
365 url.clone(),
366 None, None, Some(body_bytes.clone()),
369 None, None, )
372 .await
373 .map_err(|e| DydxHttpError::HttpClientError(e.to_string()))?;
374
375 if !request.status.is_success() {
376 return Err(DydxHttpError::HttpStatus {
377 status: request.status.as_u16(),
378 message: String::from_utf8_lossy(&request.body).to_string(),
379 });
380 }
381
382 Ok(request)
383 };
384
385 let should_retry = |error: &DydxHttpError| -> bool {
387 match error {
388 DydxHttpError::HttpClientError(_) => true,
389 DydxHttpError::HttpStatus { status, .. } => *status == 429 || *status >= 500,
390 _ => false,
391 }
392 };
393
394 let create_error = |msg: String| -> DydxHttpError {
395 if msg == "canceled" {
396 DydxHttpError::Canceled("Adapter disconnecting or shutting down".to_string())
397 } else if msg.contains("Timed out") {
398 DydxHttpError::HttpClientError(msg)
400 } else {
401 DydxHttpError::ValidationError(msg)
402 }
403 };
404
405 let response = self
406 .retry_manager
407 .execute_with_retry_with_cancel(
408 endpoint,
409 operation,
410 should_retry,
411 create_error,
412 &self.cancellation_token,
413 )
414 .await?;
415
416 serde_json::from_slice(&response.body).map_err(|e| DydxHttpError::Deserialization {
417 error: e.to_string(),
418 body: String::from_utf8_lossy(&response.body).to_string(),
419 })
420 }
421
422 pub async fn get_markets(&self) -> Result<super::models::MarketsResponse, DydxHttpError> {
428 self.send_request(Method::GET, "/v4/perpetualMarkets", None)
429 .await
430 }
431
432 pub async fn get_market(
440 &self,
441 ticker: &str,
442 ) -> Result<super::models::MarketsResponse, DydxHttpError> {
443 let query = format!("ticker={ticker}");
444 self.send_request(Method::GET, "/v4/perpetualMarkets", Some(&query))
445 .await
446 }
447
448 pub async fn get_orderbook(
454 &self,
455 ticker: &str,
456 ) -> Result<super::models::OrderbookResponse, DydxHttpError> {
457 let endpoint = format!("/v4/orderbooks/perpetualMarket/{ticker}");
458 self.send_request(Method::GET, &endpoint, None).await
459 }
460
461 pub async fn get_trades(
467 &self,
468 ticker: &str,
469 limit: Option<u32>,
470 starting_before_or_at_height: Option<u64>,
471 ) -> Result<super::models::TradesResponse, DydxHttpError> {
472 let endpoint = format!("/v4/trades/perpetualMarket/{ticker}");
473 let mut query_parts = Vec::new();
474
475 if let Some(l) = limit {
476 query_parts.push(format!("limit={l}"));
477 }
478
479 if let Some(height) = starting_before_or_at_height {
480 query_parts.push(format!("createdBeforeOrAtHeight={height}"));
481 }
482 let query = if query_parts.is_empty() {
483 None
484 } else {
485 Some(query_parts.join("&"))
486 };
487 self.send_request(Method::GET, &endpoint, query.as_deref())
488 .await
489 }
490
491 pub async fn get_candles(
497 &self,
498 ticker: &str,
499 resolution: DydxCandleResolution,
500 limit: Option<u32>,
501 from_iso: Option<DateTime<Utc>>,
502 to_iso: Option<DateTime<Utc>>,
503 ) -> Result<super::models::CandlesResponse, DydxHttpError> {
504 let endpoint = format!("/v4/candles/perpetualMarkets/{ticker}");
505 let mut query_parts = vec![format!("resolution={resolution}")];
506
507 if let Some(l) = limit {
508 query_parts.push(format!("limit={l}"));
509 }
510
511 if let Some(from) = from_iso {
512 let from_str = from.to_rfc3339();
513 query_parts.push(format!("fromISO={}", urlencoding::encode(&from_str)));
514 }
515
516 if let Some(to) = to_iso {
517 let to_str = to.to_rfc3339();
518 query_parts.push(format!("toISO={}", urlencoding::encode(&to_str)));
519 }
520 let query = query_parts.join("&");
521 self.send_request(Method::GET, &endpoint, Some(&query))
522 .await
523 }
524
525 pub async fn get_subaccount(
531 &self,
532 address: &str,
533 subaccount_number: u32,
534 ) -> Result<super::models::SubaccountResponse, DydxHttpError> {
535 let endpoint = format!("/v4/addresses/{address}/subaccountNumber/{subaccount_number}");
536 self.send_request(Method::GET, &endpoint, None).await
537 }
538
539 pub async fn get_fills(
545 &self,
546 address: &str,
547 subaccount_number: u32,
548 market: Option<&str>,
549 limit: Option<u32>,
550 ) -> Result<super::models::FillsResponse, DydxHttpError> {
551 let endpoint = "/v4/fills";
552 let mut query_parts = vec![
553 format!("address={address}"),
554 format!("subaccountNumber={subaccount_number}"),
555 ];
556
557 if let Some(m) = market {
558 query_parts.push(format!("market={m}"));
559 query_parts.push("marketType=PERPETUAL".to_string());
560 }
561
562 if let Some(l) = limit {
563 query_parts.push(format!("limit={l}"));
564 }
565 let query = query_parts.join("&");
566 self.send_request(Method::GET, endpoint, Some(&query)).await
567 }
568
569 pub async fn get_orders(
575 &self,
576 address: &str,
577 subaccount_number: u32,
578 market: Option<&str>,
579 limit: Option<u32>,
580 ) -> Result<super::models::OrdersResponse, DydxHttpError> {
581 let endpoint = "/v4/orders";
582 let mut query_parts = vec![
583 format!("address={address}"),
584 format!("subaccountNumber={subaccount_number}"),
585 ];
586
587 if let Some(m) = market {
588 query_parts.push(format!("market={m}"));
589 query_parts.push("marketType=PERPETUAL".to_string());
590 }
591
592 if let Some(l) = limit {
593 query_parts.push(format!("limit={l}"));
594 }
595 let query = query_parts.join("&");
596 self.send_request(Method::GET, endpoint, Some(&query)).await
597 }
598
599 pub async fn get_transfers(
605 &self,
606 address: &str,
607 subaccount_number: u32,
608 limit: Option<u32>,
609 ) -> Result<super::models::TransfersResponse, DydxHttpError> {
610 let endpoint = "/v4/transfers";
611 let mut query_parts = vec![
612 format!("address={address}"),
613 format!("subaccountNumber={subaccount_number}"),
614 ];
615
616 if let Some(l) = limit {
617 query_parts.push(format!("limit={l}"));
618 }
619 let query = query_parts.join("&");
620 self.send_request(Method::GET, endpoint, Some(&query)).await
621 }
622
623 pub async fn get_time(&self) -> Result<super::models::TimeResponse, DydxHttpError> {
629 self.send_request(Method::GET, "/v4/time", None).await
630 }
631
632 pub async fn get_height(&self) -> Result<super::models::HeightResponse, DydxHttpError> {
638 self.send_request(Method::GET, "/v4/height", None).await
639 }
640}
641
642#[derive(Debug)]
658#[cfg_attr(
659 feature = "python",
660 pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.dydx", from_py_object)
661)]
662pub struct DydxHttpClient {
663 pub(crate) inner: Arc<DydxRawHttpClient>,
665 pub(crate) instrument_cache: Arc<InstrumentCache>,
670 clock: &'static AtomicTime,
671}
672
673impl Clone for DydxHttpClient {
674 fn clone(&self) -> Self {
675 Self {
676 inner: self.inner.clone(),
677 instrument_cache: Arc::clone(&self.instrument_cache),
678 clock: self.clock,
679 }
680 }
681}
682
683impl Default for DydxHttpClient {
684 fn default() -> Self {
685 Self::new(None, Some(60), None, false, None)
686 .expect("Failed to create default DydxHttpClient")
687 }
688}
689
690impl DydxHttpClient {
691 pub fn new(
704 base_url: Option<String>,
705 timeout_secs: Option<u64>,
706 proxy_url: Option<String>,
707 is_testnet: bool,
708 retry_config: Option<RetryConfig>,
709 ) -> anyhow::Result<Self> {
710 Self::new_with_cache(
711 base_url,
712 timeout_secs,
713 proxy_url,
714 is_testnet,
715 retry_config,
716 Arc::new(InstrumentCache::new()),
717 )
718 }
719
720 pub fn new_with_cache(
733 base_url: Option<String>,
734 timeout_secs: Option<u64>,
735 proxy_url: Option<String>,
736 is_testnet: bool,
737 retry_config: Option<RetryConfig>,
738 instrument_cache: Arc<InstrumentCache>,
739 ) -> anyhow::Result<Self> {
740 Ok(Self {
741 inner: Arc::new(DydxRawHttpClient::new(
742 base_url,
743 timeout_secs,
744 proxy_url,
745 is_testnet,
746 retry_config,
747 )?),
748 instrument_cache,
749 clock: get_atomic_clock_realtime(),
750 })
751 }
752
753 pub async fn request_instruments(
763 &self,
764 symbol: Option<String>,
765 maker_fee: Option<Decimal>,
766 taker_fee: Option<Decimal>,
767 ) -> anyhow::Result<Vec<InstrumentAny>> {
768 let markets_response = self.inner.get_markets().await?;
769 let ts_init = self.generate_ts_init();
770
771 let mut instruments = Vec::new();
772 let mut skipped_inactive = 0;
773
774 for (ticker, market) in markets_response.markets {
775 if let Some(ref sym) = symbol
777 && ticker != *sym
778 {
779 continue;
780 }
781
782 if !super::parse::is_market_active(&market.status) {
783 log::debug!(
784 "Skipping inactive market {ticker} (status: {:?})",
785 market.status
786 );
787 skipped_inactive += 1;
788 continue;
789 }
790
791 match super::parse::parse_instrument_any(&market, maker_fee, taker_fee, ts_init) {
792 Ok(instrument) => {
793 instruments.push(instrument);
794 }
795 Err(e) => {
796 log::error!("Failed to parse instrument {ticker}: {e}");
797 }
798 }
799 }
800
801 if skipped_inactive > 0 {
802 log::info!(
803 "Parsed {} instruments, skipped {} inactive",
804 instruments.len(),
805 skipped_inactive
806 );
807 } else {
808 log::debug!("Parsed {} instruments", instruments.len());
809 }
810
811 Ok(instruments)
812 }
813
814 pub async fn fetch_and_cache_instruments(&self) -> anyhow::Result<()> {
826 let markets_response = self.inner.get_markets().await?;
828 let ts_init = self.generate_ts_init();
829
830 let mut parsed_instruments = Vec::new();
831 let mut parsed_markets = Vec::new();
832 let mut skipped_inactive = 0;
833
834 for (ticker, market) in markets_response.markets {
835 if !super::parse::is_market_active(&market.status) {
836 log::debug!(
837 "Skipping inactive market {ticker} (status: {:?})",
838 market.status
839 );
840 skipped_inactive += 1;
841 continue;
842 }
843
844 match super::parse::parse_instrument_any(&market, None, None, ts_init) {
845 Ok(instrument) => {
846 parsed_instruments.push(instrument);
847 parsed_markets.push(market);
848 }
849 Err(e) => {
850 log::error!("Failed to parse instrument {ticker}: {e}");
851 }
852 }
853 }
854
855 self.instrument_cache.clear();
857
858 let items: Vec<_> = parsed_instruments.into_iter().zip(parsed_markets).collect();
860
861 if !items.is_empty() {
862 self.instrument_cache.insert_many(items.clone());
863 }
864
865 let count = items.len();
866
867 if skipped_inactive > 0 {
868 log::info!("Cached {count} instruments, skipped {skipped_inactive} inactive");
869 } else {
870 log::info!("Cached {count} instruments");
871 }
872
873 Ok(())
874 }
875
876 pub async fn fetch_and_cache_single_instrument(
882 &self,
883 ticker: &str,
884 ) -> anyhow::Result<Option<InstrumentAny>> {
885 let markets_response = self.inner.get_market(ticker).await?;
886 let ts_init = self.generate_ts_init();
887
888 if let Some(market) = markets_response.markets.get(ticker) {
890 if !super::parse::is_market_active(&market.status) {
891 log::debug!(
892 "Skipping inactive market {ticker} (status: {:?})",
893 market.status
894 );
895 return Ok(None);
896 }
897
898 let instrument = parse_instrument_any(market, None, None, ts_init)?;
899 self.instrument_cache
900 .insert(instrument.clone(), market.clone());
901
902 log::info!("Fetched and cached new instrument: {ticker}");
903 return Ok(Some(instrument));
904 }
905
906 Ok(None)
907 }
908
909 pub fn cache_instruments(&self, instruments: Vec<InstrumentAny>) {
914 self.instrument_cache.insert_instruments_only(instruments);
915 }
916
917 pub fn cache_instrument(&self, instrument: InstrumentAny) {
922 self.instrument_cache.insert_instrument_only(instrument);
923 }
924
925 #[must_use]
927 pub fn get_instrument(&self, instrument_id: &InstrumentId) -> Option<InstrumentAny> {
928 self.instrument_cache.get(instrument_id)
929 }
930
931 #[must_use]
935 pub fn get_instrument_by_clob_id(&self, clob_pair_id: u32) -> Option<InstrumentAny> {
936 self.instrument_cache.get_by_clob_id(clob_pair_id)
937 }
938
939 #[must_use]
943 pub fn get_instrument_by_market(&self, ticker: &str) -> Option<InstrumentAny> {
944 self.instrument_cache.get_by_market(ticker)
945 }
946
947 #[must_use]
956 pub fn get_market_params(
957 &self,
958 instrument_id: &InstrumentId,
959 ) -> Option<super::models::PerpetualMarket> {
960 self.instrument_cache.get_market_params(instrument_id)
961 }
962
963 pub async fn request_trades(
972 &self,
973 symbol: &str,
974 limit: Option<u32>,
975 starting_before_or_at_height: Option<u64>,
976 ) -> anyhow::Result<super::models::TradesResponse> {
977 self.inner
978 .get_trades(symbol, limit, starting_before_or_at_height)
979 .await
980 .map_err(Into::into)
981 }
982
983 pub async fn request_candles(
992 &self,
993 symbol: &str,
994 resolution: DydxCandleResolution,
995 limit: Option<u32>,
996 from_iso: Option<DateTime<Utc>>,
997 to_iso: Option<DateTime<Utc>>,
998 ) -> anyhow::Result<super::models::CandlesResponse> {
999 self.inner
1000 .get_candles(symbol, resolution, limit, from_iso, to_iso)
1001 .await
1002 .map_err(Into::into)
1003 }
1004
1005 pub async fn request_bars(
1023 &self,
1024 bar_type: BarType,
1025 start: Option<DateTime<Utc>>,
1026 end: Option<DateTime<Utc>>,
1027 limit: Option<u32>,
1028 timestamp_on_close: bool,
1029 ) -> anyhow::Result<Vec<Bar>> {
1030 let resolution = bar_type_to_resolution(&bar_type)?;
1031 let instrument_id = bar_type.instrument_id();
1032
1033 let instrument = self
1034 .get_instrument(&instrument_id)
1035 .ok_or_else(|| anyhow::anyhow!("Instrument not found in cache: {instrument_id}"))?;
1036
1037 let ticker = extract_raw_symbol(instrument_id.symbol.as_str());
1038 let price_precision = instrument.price_precision();
1039 let size_precision = instrument.size_precision();
1040 let ts_init = self.generate_ts_init();
1041
1042 let mut all_bars: Vec<Bar> = Vec::new();
1043
1044 let spec = bar_type.spec();
1046 let bar_secs: i64 = match spec.aggregation {
1047 BarAggregation::Minute => spec.step.get() as i64 * 60,
1048 BarAggregation::Hour => spec.step.get() as i64 * 3_600,
1049 BarAggregation::Day => spec.step.get() as i64 * 86_400,
1050 _ => anyhow::bail!("Unsupported aggregation: {:?}", spec.aggregation),
1051 };
1052
1053 match (start, end) {
1054 (Some(range_start), Some(range_end)) if range_end > range_start => {
1056 let overall_limit = limit.unwrap_or(u32::MAX);
1057 let mut remaining = overall_limit;
1058 let bars_per_call = DYDX_MAX_BARS_PER_REQUEST.min(remaining);
1059 let chunk_duration = chrono::Duration::seconds(bar_secs * bars_per_call as i64);
1060 let mut chunk_start = range_start;
1061
1062 while chunk_start < range_end && remaining > 0 {
1063 let chunk_end = (chunk_start + chunk_duration).min(range_end);
1064 let per_call_limit = remaining.min(DYDX_MAX_BARS_PER_REQUEST);
1065
1066 let response = self
1067 .inner
1068 .get_candles(
1069 ticker,
1070 resolution,
1071 Some(per_call_limit),
1072 Some(chunk_start),
1073 Some(chunk_end),
1074 )
1075 .await?;
1076
1077 let count = response.candles.len() as u32;
1078 if count == 0 {
1079 break;
1080 }
1081
1082 for candle in &response.candles {
1083 match super::parse::parse_bar(
1084 candle,
1085 bar_type,
1086 price_precision,
1087 size_precision,
1088 timestamp_on_close,
1089 ts_init,
1090 ) {
1091 Ok(bar) => all_bars.push(bar),
1092 Err(e) => log::warn!("Failed to parse candle for {instrument_id}: {e}"),
1093 }
1094 }
1095
1096 if remaining <= count {
1097 break;
1098 }
1099 remaining -= count;
1100 chunk_start += chunk_duration;
1101 }
1102 }
1103 _ => {
1105 let req_limit = limit.unwrap_or(DYDX_MAX_BARS_PER_REQUEST);
1106 let response = self
1107 .inner
1108 .get_candles(ticker, resolution, Some(req_limit), None, None)
1109 .await?;
1110
1111 for candle in &response.candles {
1112 match super::parse::parse_bar(
1113 candle,
1114 bar_type,
1115 price_precision,
1116 size_precision,
1117 timestamp_on_close,
1118 ts_init,
1119 ) {
1120 Ok(bar) => all_bars.push(bar),
1121 Err(e) => log::warn!("Failed to parse candle for {instrument_id}: {e}"),
1122 }
1123 }
1124 }
1125 }
1126
1127 let current_time_ns = self.generate_ts_init();
1129 all_bars.retain(|bar| bar.ts_event < current_time_ns);
1130
1131 Ok(all_bars)
1132 }
1133
1134 pub async fn request_trade_ticks(
1152 &self,
1153 instrument_id: InstrumentId,
1154 start: Option<DateTime<Utc>>,
1155 end: Option<DateTime<Utc>>,
1156 limit: Option<u32>,
1157 ) -> anyhow::Result<Vec<TradeTick>> {
1158 const DYDX_MAX_TRADES_PER_REQUEST: u32 = 1_000;
1159 const DYDX_BLOCK_TIME_SECS: f64 = 1.1;
1160
1161 if let (Some(s), Some(e)) = (start, end) {
1163 anyhow::ensure!(s < e, "start ({s}) must be before end ({e})");
1164 }
1165
1166 let instrument = self
1167 .get_instrument(&instrument_id)
1168 .ok_or_else(|| anyhow::anyhow!("Instrument not found in cache: {instrument_id}"))?;
1169
1170 let ticker = extract_raw_symbol(instrument_id.symbol.as_str());
1171 let price_precision = instrument.price_precision();
1172 let size_precision = instrument.size_precision();
1173 let ts_init = self.generate_ts_init();
1174
1175 let initial_cursor = if let Some(end_time) = end {
1179 match self.inner.get_height().await {
1180 Ok(height_resp) => {
1181 let secs_ahead = (height_resp.time - end_time).num_seconds();
1182 if secs_ahead > 0 {
1183 let blocks_to_skip = (secs_ahead as f64 / DYDX_BLOCK_TIME_SECS) as u64;
1184 let target = height_resp.height.saturating_sub(blocks_to_skip);
1185 log::debug!(
1186 "Estimated block height at {end_time}: {target} \
1187 (current: {}, skipping ~{blocks_to_skip} blocks)",
1188 height_resp.height,
1189 );
1190 Some(target)
1191 } else {
1192 None }
1194 }
1195 Err(e) => {
1196 log::warn!(
1197 "Failed to get block height for time skip, paginating from latest: {e}"
1198 );
1199 None
1200 }
1201 }
1202 } else {
1203 None
1204 };
1205
1206 let overall_limit = limit.unwrap_or(u32::MAX);
1207 let mut remaining = overall_limit;
1208 let mut cursor_height: Option<u64> = initial_cursor;
1209 let mut all_trades = Vec::new();
1210
1211 loop {
1212 let page_limit = remaining.min(DYDX_MAX_TRADES_PER_REQUEST);
1213 let response = self
1214 .inner
1215 .get_trades(ticker, Some(page_limit), cursor_height)
1216 .await?;
1217
1218 let page_count = response.trades.len() as u32;
1219 if page_count == 0 {
1220 break;
1221 }
1222
1223 let oldest_trade = response.trades.last().unwrap();
1225
1226 cursor_height = Some(oldest_trade.created_at_height.saturating_sub(1));
1228
1229 if let Some(s) = start
1231 && oldest_trade.created_at < s
1232 {
1233 for trade in &response.trades {
1235 if start.is_some_and(|s| trade.created_at < s) {
1236 continue;
1237 }
1238
1239 if end.is_some_and(|e| trade.created_at > e) {
1240 continue;
1241 }
1242 all_trades.push(super::parse::parse_trade_tick(
1243 trade,
1244 instrument_id,
1245 price_precision,
1246 size_precision,
1247 ts_init,
1248 )?);
1249 }
1250 break;
1251 }
1252
1253 for trade in &response.trades {
1255 if start.is_some_and(|s| trade.created_at < s) {
1256 continue;
1257 }
1258
1259 if end.is_some_and(|e| trade.created_at > e) {
1260 continue;
1261 }
1262 all_trades.push(super::parse::parse_trade_tick(
1263 trade,
1264 instrument_id,
1265 price_precision,
1266 size_precision,
1267 ts_init,
1268 )?);
1269 }
1270
1271 remaining = remaining.saturating_sub(page_count);
1272
1273 if page_count < page_limit || remaining == 0 {
1275 break;
1276 }
1277 }
1278
1279 all_trades.reverse();
1281 all_trades.dedup_by(|a, b| a.trade_id == b.trade_id);
1282
1283 if let Some(lim) = limit {
1285 all_trades.truncate(lim as usize);
1286 }
1287
1288 Ok(all_trades)
1289 }
1290
1291 pub async fn request_orderbook_snapshot(
1302 &self,
1303 instrument_id: InstrumentId,
1304 ) -> anyhow::Result<OrderBookDeltas> {
1305 let instrument = self
1306 .get_instrument(&instrument_id)
1307 .ok_or_else(|| anyhow::anyhow!("Instrument not found in cache: {instrument_id}"))?;
1308
1309 let ticker = extract_raw_symbol(instrument_id.symbol.as_str());
1310 let response = self.inner.get_orderbook(ticker).await?;
1311
1312 let ts_init = self.generate_ts_init();
1313
1314 let mut deltas = Vec::with_capacity(1 + response.bids.len() + response.asks.len());
1315
1316 deltas.push(OrderBookDelta::clear(instrument_id, 0, ts_init, ts_init));
1317
1318 for (i, level) in response.bids.iter().enumerate() {
1319 let is_last = i == response.bids.len() - 1 && response.asks.is_empty();
1320 let flags = if is_last { RecordFlag::F_LAST as u8 } else { 0 };
1321
1322 let order = BookOrder::new(
1323 NautilusOrderSide::Buy,
1324 Price::from_decimal_dp(level.price, instrument.price_precision())?,
1325 Quantity::from_decimal_dp(level.size, instrument.size_precision())?,
1326 0,
1327 );
1328
1329 deltas.push(OrderBookDelta::new(
1330 instrument_id,
1331 BookAction::Add,
1332 order,
1333 flags,
1334 0,
1335 ts_init,
1336 ts_init,
1337 ));
1338 }
1339
1340 for (i, level) in response.asks.iter().enumerate() {
1341 let is_last = i == response.asks.len() - 1;
1342 let flags = if is_last { RecordFlag::F_LAST as u8 } else { 0 };
1343
1344 let order = BookOrder::new(
1345 NautilusOrderSide::Sell,
1346 Price::from_decimal_dp(level.price, instrument.price_precision())?,
1347 Quantity::from_decimal_dp(level.size, instrument.size_precision())?,
1348 0,
1349 );
1350
1351 deltas.push(OrderBookDelta::new(
1352 instrument_id,
1353 BookAction::Add,
1354 order,
1355 flags,
1356 0,
1357 ts_init,
1358 ts_init,
1359 ));
1360 }
1361
1362 Ok(OrderBookDeltas::new(instrument_id, deltas))
1363 }
1364
1365 #[must_use]
1371 pub fn raw_client(&self) -> &Arc<DydxRawHttpClient> {
1372 &self.inner
1373 }
1374
1375 #[must_use]
1377 pub fn is_testnet(&self) -> bool {
1378 self.inner.is_testnet()
1379 }
1380
1381 #[must_use]
1383 pub fn base_url(&self) -> &str {
1384 self.inner.base_url()
1385 }
1386
1387 #[must_use]
1389 pub fn is_cache_initialized(&self) -> bool {
1390 self.instrument_cache.is_initialized()
1391 }
1392
1393 #[must_use]
1395 pub fn cached_instruments_count(&self) -> usize {
1396 self.instrument_cache.len()
1397 }
1398
1399 #[must_use]
1403 pub fn instrument_cache(&self) -> &Arc<InstrumentCache> {
1404 &self.instrument_cache
1405 }
1406
1407 #[must_use]
1411 pub fn all_instruments(&self) -> Vec<InstrumentAny> {
1412 self.instrument_cache.all_instruments()
1413 }
1414
1415 #[must_use]
1417 pub fn all_instrument_ids(&self) -> Vec<InstrumentId> {
1418 self.instrument_cache.all_instrument_ids()
1419 }
1420
1421 fn generate_ts_init(&self) -> UnixNanos {
1422 self.clock.get_time_ns()
1423 }
1424
1425 pub async fn request_order_status_reports(
1434 &self,
1435 address: &str,
1436 subaccount_number: u32,
1437 account_id: AccountId,
1438 instrument_id: Option<InstrumentId>,
1439 ) -> anyhow::Result<Vec<OrderStatusReport>> {
1440 let ts_init = self.generate_ts_init();
1441
1442 let market = instrument_id.map(|id| {
1444 let symbol = id.symbol.to_string();
1445 symbol.trim_end_matches("-PERP").to_string()
1447 });
1448
1449 let orders = self
1450 .inner
1451 .get_orders(address, subaccount_number, market.as_deref(), None)
1452 .await?;
1453
1454 let mut reports = Vec::new();
1455
1456 for order in orders {
1457 let instrument = match self.get_instrument_by_clob_id(order.clob_pair_id) {
1459 Some(inst) => inst,
1460 None => {
1461 log::warn!(
1462 "Skipping order {}: no cached instrument for clob_pair_id {}",
1463 order.id,
1464 order.clob_pair_id
1465 );
1466 continue;
1467 }
1468 };
1469
1470 if instrument_id.is_some_and(|filter_id| instrument.id() != filter_id) {
1472 continue;
1473 }
1474
1475 match super::parse::parse_order_status_report(&order, &instrument, account_id, ts_init)
1476 {
1477 Ok(report) => reports.push(report),
1478 Err(e) => {
1479 log::warn!("Failed to parse order {}: {e}", order.id);
1480 }
1481 }
1482 }
1483
1484 Ok(reports)
1485 }
1486
1487 pub async fn request_fill_reports(
1496 &self,
1497 address: &str,
1498 subaccount_number: u32,
1499 account_id: AccountId,
1500 instrument_id: Option<InstrumentId>,
1501 ) -> anyhow::Result<Vec<FillReport>> {
1502 let ts_init = self.generate_ts_init();
1503
1504 let market = instrument_id.map(|id| {
1506 let symbol = id.symbol.to_string();
1507 symbol.trim_end_matches("-PERP").to_string()
1508 });
1509
1510 let fills_response = self
1511 .inner
1512 .get_fills(address, subaccount_number, market.as_deref(), None)
1513 .await?;
1514
1515 let mut reports = Vec::new();
1516
1517 for fill in fills_response.fills {
1518 let instrument = match self.get_instrument_by_market(&fill.market) {
1520 Some(inst) => inst,
1521 None => {
1522 log::warn!(
1523 "Skipping fill {}: no cached instrument for market {}",
1524 fill.id,
1525 fill.market
1526 );
1527 continue;
1528 }
1529 };
1530
1531 if instrument_id.is_some_and(|filter_id| instrument.id() != filter_id) {
1533 continue;
1534 }
1535
1536 match super::parse::parse_fill_report(&fill, &instrument, account_id, ts_init) {
1537 Ok(report) => reports.push(report),
1538 Err(e) => {
1539 log::warn!("Failed to parse fill {}: {e}", fill.id);
1540 }
1541 }
1542 }
1543
1544 Ok(reports)
1545 }
1546
1547 pub async fn request_position_status_reports(
1556 &self,
1557 address: &str,
1558 subaccount_number: u32,
1559 account_id: AccountId,
1560 instrument_id: Option<InstrumentId>,
1561 ) -> anyhow::Result<Vec<PositionStatusReport>> {
1562 let ts_init = self.generate_ts_init();
1563
1564 let subaccount_response = self
1565 .inner
1566 .get_subaccount(address, subaccount_number)
1567 .await?;
1568
1569 let mut reports = Vec::new();
1570
1571 for (market, position) in subaccount_response.subaccount.open_perpetual_positions {
1572 let instrument = match self.get_instrument_by_market(&market) {
1574 Some(inst) => inst,
1575 None => {
1576 log::warn!("Skipping position: no cached instrument for market {market}");
1577 continue;
1578 }
1579 };
1580
1581 if instrument_id.is_some_and(|filter_id| instrument.id() != filter_id) {
1583 continue;
1584 }
1585
1586 match super::parse::parse_position_status_report(
1587 &position,
1588 &instrument,
1589 account_id,
1590 ts_init,
1591 ) {
1592 Ok(report) => reports.push(report),
1593 Err(e) => {
1594 log::warn!("Failed to parse position for {market}: {e}");
1595 }
1596 }
1597 }
1598
1599 Ok(reports)
1600 }
1601
1602 pub async fn request_account_state(
1611 &self,
1612 address: &str,
1613 subaccount_number: u32,
1614 account_id: AccountId,
1615 ) -> anyhow::Result<AccountState> {
1616 let ts_init = self.generate_ts_init();
1617 let subaccount_response = self
1618 .inner
1619 .get_subaccount(address, subaccount_number)
1620 .await?;
1621
1622 let instruments: HashMap<InstrumentId, InstrumentAny> = self
1624 .instrument_cache
1625 .all_instruments()
1626 .into_iter()
1627 .map(|inst| (inst.id(), inst))
1628 .collect();
1629
1630 let oracle_prices = self.instrument_cache.to_oracle_prices_map();
1632
1633 parse_account_state_from_http(
1634 &subaccount_response.subaccount,
1635 account_id,
1636 &instruments,
1637 &oracle_prices,
1638 ts_init,
1639 ts_init,
1640 )
1641 }
1642}
1643
1644#[cfg(test)]
1645mod tests {
1646 use axum::{Router, routing::get};
1647 use nautilus_model::identifiers::{Symbol, Venue};
1648 use rstest::rstest;
1649
1650 use super::*;
1651 use crate::http::error;
1652
1653 #[tokio::test]
1654 async fn test_raw_client_creation() {
1655 let client = DydxRawHttpClient::new(None, Some(30), None, false, None);
1656 assert!(client.is_ok());
1657
1658 let client = client.unwrap();
1659 assert!(!client.is_testnet());
1660 assert_eq!(client.base_url(), DYDX_HTTP_URL);
1661 }
1662
1663 #[tokio::test]
1664 async fn test_raw_client_testnet() {
1665 let client = DydxRawHttpClient::new(None, Some(30), None, true, None);
1666 assert!(client.is_ok());
1667
1668 let client = client.unwrap();
1669 assert!(client.is_testnet());
1670 assert_eq!(client.base_url(), DYDX_TESTNET_HTTP_URL);
1671 }
1672
1673 #[tokio::test]
1674 async fn test_domain_client_creation() {
1675 let client = DydxHttpClient::new(None, Some(30), None, false, None);
1676 assert!(client.is_ok());
1677
1678 let client = client.unwrap();
1679 assert!(!client.is_testnet());
1680 assert_eq!(client.base_url(), DYDX_HTTP_URL);
1681 assert!(!client.is_cache_initialized());
1682 assert_eq!(client.cached_instruments_count(), 0);
1683 }
1684
1685 #[tokio::test]
1686 async fn test_domain_client_testnet() {
1687 let client = DydxHttpClient::new(None, Some(30), None, true, None);
1688 assert!(client.is_ok());
1689
1690 let client = client.unwrap();
1691 assert!(client.is_testnet());
1692 assert_eq!(client.base_url(), DYDX_TESTNET_HTTP_URL);
1693 }
1694
1695 #[tokio::test]
1696 async fn test_domain_client_default() {
1697 let client = DydxHttpClient::default();
1698 assert!(!client.is_testnet());
1699 assert_eq!(client.base_url(), DYDX_HTTP_URL);
1700 assert!(!client.is_cache_initialized());
1701 }
1702
1703 #[tokio::test]
1704 async fn test_domain_client_clone() {
1705 let client = DydxHttpClient::new(None, Some(30), None, false, None).unwrap();
1706
1707 let cloned = client.clone();
1709 assert!(!cloned.is_cache_initialized());
1710
1711 client.instrument_cache.insert_instruments_only(vec![]);
1712
1713 #[allow(clippy::redundant_clone)]
1715 let cloned_after = client.clone();
1716 assert!(cloned_after.is_cache_initialized());
1717 }
1718
1719 #[rstest]
1720 fn test_domain_client_get_instrument_not_found() {
1721 let client = DydxHttpClient::default();
1722 let instrument_id = InstrumentId::new(Symbol::new("ETH-USD-PERP"), Venue::new("DYDX"));
1723 let result = client.get_instrument(&instrument_id);
1724 assert!(result.is_none());
1725 }
1726
1727 #[tokio::test]
1728 async fn test_http_timeout_respects_configuration_and_does_not_block() {
1729 use tokio::net::TcpListener;
1730
1731 async fn slow_handler() -> &'static str {
1732 tokio::time::sleep(std::time::Duration::from_secs(5)).await;
1734 "ok"
1735 }
1736
1737 let router = Router::new().route("/v4/slow", get(slow_handler));
1738
1739 let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
1740 let addr = listener.local_addr().unwrap();
1741
1742 tokio::spawn(async move {
1743 axum::serve(listener, router.into_make_service())
1744 .await
1745 .unwrap();
1746 });
1747
1748 let base_url = format!("http://{addr}");
1749
1750 let retry_config = RetryConfig {
1753 max_retries: 0,
1754 initial_delay_ms: 0,
1755 max_delay_ms: 0,
1756 backoff_factor: 1.0,
1757 jitter_ms: 0,
1758 operation_timeout_ms: Some(500),
1759 immediate_first: true,
1760 max_elapsed_ms: Some(1_000),
1761 };
1762
1763 let client =
1766 DydxRawHttpClient::new(Some(base_url), Some(60), None, false, Some(retry_config))
1767 .unwrap();
1768
1769 let start = std::time::Instant::now();
1770 let result: Result<serde_json::Value, error::DydxHttpError> =
1771 client.send_request(Method::GET, "/v4/slow", None).await;
1772 let elapsed = start.elapsed();
1773
1774 assert!(result.is_err());
1777 assert!(elapsed < std::time::Duration::from_secs(3));
1778 }
1779}