Skip to main content

nautilus_dydx/http/
client.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Provides an ergonomic wrapper around the **dYdX v4 Indexer REST API** –
17//! <https://docs.dydx.exchange/api_integration-indexer/indexer_api>.
18//!
19//! This module exports two complementary HTTP clients following the standardized
20//! two-layer architecture pattern established in OKX, Bybit, and BitMEX adapters:
21//!
22//! - [`DydxRawHttpClient`]: Low-level HTTP methods matching dYdX Indexer API endpoints.
23//! - [`DydxHttpClient`]: High-level methods using Nautilus domain types with instrument caching.
24//!
25//! ## Two-Layer Architecture
26//!
27//! The raw client handles HTTP communication, rate limiting, retries, and basic response parsing.
28//! The domain client wraps the raw client in an `Arc`, maintains an instrument cache using `DashMap`,
29//! and provides high-level methods that work with Nautilus domain types.
30//!
31//! ## Responsibilities
32//!
33//! - Rate-limiting based on the public dYdX specification.
34//! - Zero-copy deserialization of large JSON payloads into domain models.
35//! - Conversion of raw exchange errors into the rich [`DydxHttpError`] enum.
36//! - Instrument caching with standard methods: `cache_instruments()`, `cache_instrument()`, `get_instrument()`.
37//!
38//! # Important Note
39//!
40//! The dYdX v4 Indexer REST API does **NOT** require authentication or request signing.
41//! All endpoints are publicly accessible using only wallet addresses and subaccount numbers
42//! as query parameters. Order submission and trading operations use gRPC with blockchain
43//! transaction signing, not REST API.
44//!
45//! # Official Documentation
46//!
47//! | Endpoint          | Reference                                                                 |
48//! |-------------------|---------------------------------------------------------------------------|
49//! | Market data       | <https://docs.dydx.exchange/api_integration-indexer/indexer_api#markets>  |
50//! | Account data      | <https://docs.dydx.exchange/api_integration-indexer/indexer_api#accounts> |
51//! | Utility endpoints | <https://docs.dydx.exchange/api_integration-indexer/indexer_api#utility>  |
52
53use std::{
54    collections::HashMap,
55    fmt::Debug,
56    num::NonZeroU32,
57    sync::{Arc, LazyLock},
58};
59
60use chrono::{DateTime, Utc};
61use nautilus_core::{
62    UnixNanos,
63    consts::NAUTILUS_USER_AGENT,
64    time::{AtomicTime, get_atomic_clock_realtime},
65};
66use nautilus_model::{
67    data::{Bar, BarType, BookOrder, OrderBookDelta, OrderBookDeltas, TradeTick},
68    enums::{
69        AggregationSource, BarAggregation, BookAction, OrderSide as NautilusOrderSide, PriceType,
70        RecordFlag,
71    },
72    events::AccountState,
73    identifiers::{AccountId, InstrumentId},
74    instruments::{Instrument, InstrumentAny},
75    reports::{FillReport, OrderStatusReport, PositionStatusReport},
76    types::{Price, Quantity},
77};
78use nautilus_network::{
79    http::{HttpClient, Method, USER_AGENT},
80    ratelimiter::quota::Quota,
81    retry::{RetryConfig, RetryManager},
82};
83use rust_decimal::Decimal;
84use serde::{Deserialize, Serialize, de::DeserializeOwned};
85use tokio_util::sync::CancellationToken;
86
87use super::error::DydxHttpError;
88use crate::{
89    common::{
90        consts::{DYDX_HTTP_URL, DYDX_TESTNET_HTTP_URL},
91        enums::DydxCandleResolution,
92        instrument_cache::InstrumentCache,
93        parse::extract_raw_symbol,
94    },
95    http::parse::{parse_account_state_from_http, parse_instrument_any},
96};
97
98/// Maximum number of candles returned per dYdX API request.
99const DYDX_MAX_BARS_PER_REQUEST: u32 = 1_000;
100
101fn bar_type_to_resolution(bar_type: &BarType) -> anyhow::Result<DydxCandleResolution> {
102    if bar_type.aggregation_source() != AggregationSource::External {
103        anyhow::bail!(
104            "dYdX only supports EXTERNAL aggregation, was {:?}",
105            bar_type.aggregation_source()
106        );
107    }
108
109    let spec = bar_type.spec();
110    if spec.price_type != PriceType::Last {
111        anyhow::bail!(
112            "dYdX only supports LAST price type, was {:?}",
113            spec.price_type
114        );
115    }
116
117    DydxCandleResolution::from_bar_spec(&spec)
118}
119
120/// Default dYdX Indexer REST API rate limit.
121///
122/// The dYdX Indexer API rate limits are generous for read-only operations:
123/// - General: 100 requests per 10 seconds per IP
124/// - We use a conservative 10 requests per second as the default quota.
125pub static DYDX_REST_QUOTA: LazyLock<Quota> = LazyLock::new(|| {
126    Quota::per_second(NonZeroU32::new(10).expect("non-zero")).expect("valid constant")
127});
128
129/// Represents a dYdX HTTP response wrapper.
130///
131/// Most dYdX Indexer API endpoints return data directly without a wrapper,
132/// but some endpoints may use this structure for consistency.
133#[derive(Debug, Serialize, Deserialize)]
134pub struct DydxResponse<T> {
135    /// The typed data returned by the dYdX endpoint.
136    pub data: T,
137}
138
139/// Provides a raw HTTP client for interacting with the [dYdX v4](https://dydx.exchange) Indexer REST API.
140///
141/// This client wraps the underlying [`HttpClient`] to handle functionality
142/// specific to dYdX Indexer API, such as rate-limiting, forming request URLs,
143/// and deserializing responses into dYdX specific data models.
144///
145/// **Note**: Unlike traditional centralized exchanges, the dYdX v4 Indexer REST API
146/// does NOT require authentication, API keys, or request signing. All endpoints are
147/// publicly accessible.
148pub struct DydxRawHttpClient {
149    base_url: String,
150    client: HttpClient,
151    retry_manager: RetryManager<DydxHttpError>,
152    cancellation_token: CancellationToken,
153    is_testnet: bool,
154}
155
156impl Default for DydxRawHttpClient {
157    fn default() -> Self {
158        Self::new(None, Some(60), None, false, None)
159            .expect("Failed to create default DydxRawHttpClient")
160    }
161}
162
163impl Debug for DydxRawHttpClient {
164    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
165        f.debug_struct(stringify!(DydxRawHttpClient))
166            .field("base_url", &self.base_url)
167            .field("is_testnet", &self.is_testnet)
168            .finish_non_exhaustive()
169    }
170}
171
172impl DydxRawHttpClient {
173    /// Cancel all pending HTTP requests.
174    pub fn cancel_all_requests(&self) {
175        self.cancellation_token.cancel();
176    }
177
178    /// Get the cancellation token for this client.
179    pub fn cancellation_token(&self) -> &CancellationToken {
180        &self.cancellation_token
181    }
182
183    /// Creates a new [`DydxRawHttpClient`] using the default dYdX Indexer HTTP URL,
184    /// optionally overridden with a custom base URL.
185    ///
186    /// **Note**: No credentials are required as the dYdX Indexer API is publicly accessible.
187    ///
188    /// # Errors
189    ///
190    /// Returns an error if the retry manager cannot be created.
191    pub fn new(
192        base_url: Option<String>,
193        timeout_secs: Option<u64>,
194        proxy_url: Option<String>,
195        is_testnet: bool,
196        retry_config: Option<RetryConfig>,
197    ) -> anyhow::Result<Self> {
198        let base_url = if is_testnet {
199            base_url.unwrap_or_else(|| DYDX_TESTNET_HTTP_URL.to_string())
200        } else {
201            base_url.unwrap_or_else(|| DYDX_HTTP_URL.to_string())
202        };
203
204        let retry_manager = RetryManager::new(retry_config.unwrap_or_default());
205
206        let mut headers = HashMap::new();
207        headers.insert(USER_AGENT.to_string(), NAUTILUS_USER_AGENT.to_string());
208
209        let client = HttpClient::new(
210            headers,
211            vec![], // No specific headers to extract from responses
212            vec![], // No keyed quotas (we use a single global quota)
213            Some(*DYDX_REST_QUOTA),
214            timeout_secs,
215            proxy_url,
216        )
217        .map_err(|e| {
218            DydxHttpError::ValidationError(format!("Failed to create HTTP client: {e}"))
219        })?;
220
221        Ok(Self {
222            base_url,
223            client,
224            retry_manager,
225            cancellation_token: CancellationToken::new(),
226            is_testnet,
227        })
228    }
229
230    /// Check if this client is configured for testnet.
231    #[must_use]
232    pub const fn is_testnet(&self) -> bool {
233        self.is_testnet
234    }
235
236    /// Get the base URL being used by this client.
237    #[must_use]
238    pub fn base_url(&self) -> &str {
239        &self.base_url
240    }
241
242    /// Send a request to a dYdX Indexer API endpoint.
243    ///
244    /// **Note**: dYdX Indexer API does not require authentication headers.
245    ///
246    /// # Errors
247    ///
248    /// Returns an error if:
249    /// - The HTTP request fails.
250    /// - The response has a non-success HTTP status code.
251    /// - The response body cannot be deserialized to type `T`.
252    /// - The request is canceled.
253    pub async fn send_request<T>(
254        &self,
255        method: Method,
256        endpoint: &str,
257        query_params: Option<&str>,
258    ) -> Result<T, DydxHttpError>
259    where
260        T: DeserializeOwned,
261    {
262        let url = if let Some(params) = query_params {
263            format!("{}{endpoint}?{params}", self.base_url)
264        } else {
265            format!("{}{endpoint}", self.base_url)
266        };
267
268        let operation = || async {
269            let request = self
270                .client
271                .request_with_ustr_keys(
272                    method.clone(),
273                    url.clone(),
274                    None, // No params
275                    None, // No additional headers
276                    None, // No body for GET requests
277                    None, // Use default timeout
278                    None, // No specific rate limit keys (using global quota)
279                )
280                .await
281                .map_err(|e| DydxHttpError::HttpClientError(e.to_string()))?;
282
283            if !request.status.is_success() {
284                return Err(DydxHttpError::HttpStatus {
285                    status: request.status.as_u16(),
286                    message: String::from_utf8_lossy(&request.body).to_string(),
287                });
288            }
289
290            Ok(request)
291        };
292
293        // Retry strategy for dYdX Indexer API:
294        // 1. Network errors: always retry (transient connection issues)
295        // 2. HTTP 429/5xx: rate limiting and server errors should be retried
296        // 3. Client errors (4xx except 429): should NOT be retried
297        let should_retry = |error: &DydxHttpError| -> bool {
298            match error {
299                DydxHttpError::HttpClientError(_) => true,
300                DydxHttpError::HttpStatus { status, .. } => *status == 429 || *status >= 500,
301                _ => false,
302            }
303        };
304
305        let create_error = |msg: String| -> DydxHttpError {
306            if msg == "canceled" {
307                DydxHttpError::Canceled("Adapter disconnecting or shutting down".to_string())
308            } else if msg.contains("Timed out") {
309                // Timeouts are transient — map to HttpClientError so they are retried
310                DydxHttpError::HttpClientError(msg)
311            } else {
312                DydxHttpError::ValidationError(msg)
313            }
314        };
315
316        let response = self
317            .retry_manager
318            .execute_with_retry_with_cancel(
319                endpoint,
320                operation,
321                should_retry,
322                create_error,
323                &self.cancellation_token,
324            )
325            .await?;
326
327        serde_json::from_slice(&response.body).map_err(|e| DydxHttpError::Deserialization {
328            error: e.to_string(),
329            body: String::from_utf8_lossy(&response.body).to_string(),
330        })
331    }
332
333    /// Send a POST request to a dYdX Indexer API endpoint.
334    ///
335    /// Note: Most dYdX Indexer endpoints are GET-based. POST is rarely used.
336    ///
337    /// # Errors
338    ///
339    /// Returns an error if:
340    /// - The request body cannot be serialized to JSON.
341    /// - The HTTP request fails.
342    /// - The response has a non-success HTTP status code.
343    /// - The response body cannot be deserialized to type `T`.
344    /// - The request is canceled.
345    pub async fn send_post_request<T, B>(
346        &self,
347        endpoint: &str,
348        body: &B,
349    ) -> Result<T, DydxHttpError>
350    where
351        T: DeserializeOwned,
352        B: Serialize,
353    {
354        let url = format!("{}{endpoint}", self.base_url);
355
356        let body_bytes = serde_json::to_vec(body).map_err(|e| DydxHttpError::Serialization {
357            error: e.to_string(),
358        })?;
359
360        let operation = || async {
361            let request = self
362                .client
363                .request_with_ustr_keys(
364                    Method::POST,
365                    url.clone(),
366                    None, // No params
367                    None, // No additional headers (content-type handled by body)
368                    Some(body_bytes.clone()),
369                    None, // Use default timeout
370                    None, // No specific rate limit keys (using global quota)
371                )
372                .await
373                .map_err(|e| DydxHttpError::HttpClientError(e.to_string()))?;
374
375            if !request.status.is_success() {
376                return Err(DydxHttpError::HttpStatus {
377                    status: request.status.as_u16(),
378                    message: String::from_utf8_lossy(&request.body).to_string(),
379                });
380            }
381
382            Ok(request)
383        };
384
385        // Retry strategy (same as GET requests)
386        let should_retry = |error: &DydxHttpError| -> bool {
387            match error {
388                DydxHttpError::HttpClientError(_) => true,
389                DydxHttpError::HttpStatus { status, .. } => *status == 429 || *status >= 500,
390                _ => false,
391            }
392        };
393
394        let create_error = |msg: String| -> DydxHttpError {
395            if msg == "canceled" {
396                DydxHttpError::Canceled("Adapter disconnecting or shutting down".to_string())
397            } else if msg.contains("Timed out") {
398                // Timeouts are transient — map to HttpClientError so they are retried
399                DydxHttpError::HttpClientError(msg)
400            } else {
401                DydxHttpError::ValidationError(msg)
402            }
403        };
404
405        let response = self
406            .retry_manager
407            .execute_with_retry_with_cancel(
408                endpoint,
409                operation,
410                should_retry,
411                create_error,
412                &self.cancellation_token,
413            )
414            .await?;
415
416        serde_json::from_slice(&response.body).map_err(|e| DydxHttpError::Deserialization {
417            error: e.to_string(),
418            body: String::from_utf8_lossy(&response.body).to_string(),
419        })
420    }
421
422    /// Fetch all perpetual markets from dYdX.
423    ///
424    /// # Errors
425    ///
426    /// Returns an error if the HTTP request fails or response parsing fails.
427    pub async fn get_markets(&self) -> Result<super::models::MarketsResponse, DydxHttpError> {
428        self.send_request(Method::GET, "/v4/perpetualMarkets", None)
429            .await
430    }
431
432    /// Fetch a single perpetual market by ticker.
433    ///
434    /// Uses the `market` query parameter for efficient single-market fetch.
435    ///
436    /// # Errors
437    ///
438    /// Returns an error if the HTTP request fails or response parsing fails.
439    pub async fn get_market(
440        &self,
441        ticker: &str,
442    ) -> Result<super::models::MarketsResponse, DydxHttpError> {
443        let query = format!("ticker={ticker}");
444        self.send_request(Method::GET, "/v4/perpetualMarkets", Some(&query))
445            .await
446    }
447
448    /// Fetch orderbook for a specific market.
449    ///
450    /// # Errors
451    ///
452    /// Returns an error if the HTTP request fails or response parsing fails.
453    pub async fn get_orderbook(
454        &self,
455        ticker: &str,
456    ) -> Result<super::models::OrderbookResponse, DydxHttpError> {
457        let endpoint = format!("/v4/orderbooks/perpetualMarket/{ticker}");
458        self.send_request(Method::GET, &endpoint, None).await
459    }
460
461    /// Fetch recent trades for a market.
462    ///
463    /// # Errors
464    ///
465    /// Returns an error if the HTTP request fails or response parsing fails.
466    pub async fn get_trades(
467        &self,
468        ticker: &str,
469        limit: Option<u32>,
470        starting_before_or_at_height: Option<u64>,
471    ) -> Result<super::models::TradesResponse, DydxHttpError> {
472        let endpoint = format!("/v4/trades/perpetualMarket/{ticker}");
473        let mut query_parts = Vec::new();
474
475        if let Some(l) = limit {
476            query_parts.push(format!("limit={l}"));
477        }
478
479        if let Some(height) = starting_before_or_at_height {
480            query_parts.push(format!("createdBeforeOrAtHeight={height}"));
481        }
482        let query = if query_parts.is_empty() {
483            None
484        } else {
485            Some(query_parts.join("&"))
486        };
487        self.send_request(Method::GET, &endpoint, query.as_deref())
488            .await
489    }
490
491    /// Fetch candles/klines for a market.
492    ///
493    /// # Errors
494    ///
495    /// Returns an error if the HTTP request fails or response parsing fails.
496    pub async fn get_candles(
497        &self,
498        ticker: &str,
499        resolution: DydxCandleResolution,
500        limit: Option<u32>,
501        from_iso: Option<DateTime<Utc>>,
502        to_iso: Option<DateTime<Utc>>,
503    ) -> Result<super::models::CandlesResponse, DydxHttpError> {
504        let endpoint = format!("/v4/candles/perpetualMarkets/{ticker}");
505        let mut query_parts = vec![format!("resolution={resolution}")];
506
507        if let Some(l) = limit {
508            query_parts.push(format!("limit={l}"));
509        }
510
511        if let Some(from) = from_iso {
512            let from_str = from.to_rfc3339();
513            query_parts.push(format!("fromISO={}", urlencoding::encode(&from_str)));
514        }
515
516        if let Some(to) = to_iso {
517            let to_str = to.to_rfc3339();
518            query_parts.push(format!("toISO={}", urlencoding::encode(&to_str)));
519        }
520        let query = query_parts.join("&");
521        self.send_request(Method::GET, &endpoint, Some(&query))
522            .await
523    }
524
525    /// Fetch subaccount information.
526    ///
527    /// # Errors
528    ///
529    /// Returns an error if the HTTP request fails or response parsing fails.
530    pub async fn get_subaccount(
531        &self,
532        address: &str,
533        subaccount_number: u32,
534    ) -> Result<super::models::SubaccountResponse, DydxHttpError> {
535        let endpoint = format!("/v4/addresses/{address}/subaccountNumber/{subaccount_number}");
536        self.send_request(Method::GET, &endpoint, None).await
537    }
538
539    /// Fetch fills for a subaccount.
540    ///
541    /// # Errors
542    ///
543    /// Returns an error if the HTTP request fails or response parsing fails.
544    pub async fn get_fills(
545        &self,
546        address: &str,
547        subaccount_number: u32,
548        market: Option<&str>,
549        limit: Option<u32>,
550    ) -> Result<super::models::FillsResponse, DydxHttpError> {
551        let endpoint = "/v4/fills";
552        let mut query_parts = vec![
553            format!("address={address}"),
554            format!("subaccountNumber={subaccount_number}"),
555        ];
556
557        if let Some(m) = market {
558            query_parts.push(format!("market={m}"));
559            query_parts.push("marketType=PERPETUAL".to_string());
560        }
561
562        if let Some(l) = limit {
563            query_parts.push(format!("limit={l}"));
564        }
565        let query = query_parts.join("&");
566        self.send_request(Method::GET, endpoint, Some(&query)).await
567    }
568
569    /// Fetch orders for a subaccount.
570    ///
571    /// # Errors
572    ///
573    /// Returns an error if the HTTP request fails or response parsing fails.
574    pub async fn get_orders(
575        &self,
576        address: &str,
577        subaccount_number: u32,
578        market: Option<&str>,
579        limit: Option<u32>,
580    ) -> Result<super::models::OrdersResponse, DydxHttpError> {
581        let endpoint = "/v4/orders";
582        let mut query_parts = vec![
583            format!("address={address}"),
584            format!("subaccountNumber={subaccount_number}"),
585        ];
586
587        if let Some(m) = market {
588            query_parts.push(format!("market={m}"));
589            query_parts.push("marketType=PERPETUAL".to_string());
590        }
591
592        if let Some(l) = limit {
593            query_parts.push(format!("limit={l}"));
594        }
595        let query = query_parts.join("&");
596        self.send_request(Method::GET, endpoint, Some(&query)).await
597    }
598
599    /// Fetch transfers for a subaccount.
600    ///
601    /// # Errors
602    ///
603    /// Returns an error if the HTTP request fails or response parsing fails.
604    pub async fn get_transfers(
605        &self,
606        address: &str,
607        subaccount_number: u32,
608        limit: Option<u32>,
609    ) -> Result<super::models::TransfersResponse, DydxHttpError> {
610        let endpoint = "/v4/transfers";
611        let mut query_parts = vec![
612            format!("address={address}"),
613            format!("subaccountNumber={subaccount_number}"),
614        ];
615
616        if let Some(l) = limit {
617            query_parts.push(format!("limit={l}"));
618        }
619        let query = query_parts.join("&");
620        self.send_request(Method::GET, endpoint, Some(&query)).await
621    }
622
623    /// Get current server time.
624    ///
625    /// # Errors
626    ///
627    /// Returns an error if the HTTP request fails or response parsing fails.
628    pub async fn get_time(&self) -> Result<super::models::TimeResponse, DydxHttpError> {
629        self.send_request(Method::GET, "/v4/time", None).await
630    }
631
632    /// Get current blockchain height.
633    ///
634    /// # Errors
635    ///
636    /// Returns an error if the HTTP request fails or response parsing fails.
637    pub async fn get_height(&self) -> Result<super::models::HeightResponse, DydxHttpError> {
638        self.send_request(Method::GET, "/v4/height", None).await
639    }
640}
641
642/// Provides a higher-level HTTP client for the [dYdX v4](https://dydx.exchange) Indexer REST API.
643///
644/// This client wraps the underlying `DydxRawHttpClient` to handle conversions
645/// into the Nautilus domain model, following the two-layer pattern established
646/// in OKX, Bybit, and BitMEX adapters.
647///
648/// **Architecture:**
649/// - **Raw client** (`DydxRawHttpClient`): Low-level HTTP methods matching dYdX Indexer API endpoints.
650/// - **Domain client** (`DydxHttpClient`): High-level methods using Nautilus domain types.
651///
652/// The domain client:
653/// - Wraps the raw client in an `Arc` for efficient cloning (required for Python bindings).
654/// - Maintains an instrument cache using `DashMap` for thread-safe concurrent access.
655/// - Provides standard cache methods: `cache_instruments()`, `cache_instrument()`, `get_instrument()`.
656/// - Tracks cache initialization state for optimizations.
657#[derive(Debug)]
658#[cfg_attr(
659    feature = "python",
660    pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.dydx", from_py_object)
661)]
662pub struct DydxHttpClient {
663    /// Raw HTTP client wrapped in Arc for efficient cloning.
664    pub(crate) inner: Arc<DydxRawHttpClient>,
665    /// Shared instrument cache with multiple lookup indices.
666    ///
667    /// This cache is shared across HTTP client, WebSocket client, and execution client.
668    /// It provides O(1) lookups by symbol, market ticker, or clob_pair_id.
669    pub(crate) instrument_cache: Arc<InstrumentCache>,
670    clock: &'static AtomicTime,
671}
672
673impl Clone for DydxHttpClient {
674    fn clone(&self) -> Self {
675        Self {
676            inner: self.inner.clone(),
677            instrument_cache: Arc::clone(&self.instrument_cache),
678            clock: self.clock,
679        }
680    }
681}
682
683impl Default for DydxHttpClient {
684    fn default() -> Self {
685        Self::new(None, Some(60), None, false, None)
686            .expect("Failed to create default DydxHttpClient")
687    }
688}
689
690impl DydxHttpClient {
691    /// Creates a new [`DydxHttpClient`] using the default dYdX Indexer HTTP URL,
692    /// optionally overridden with a custom base URL.
693    ///
694    /// This constructor creates its own internal instrument cache. For shared caching
695    /// across multiple clients, use [`new_with_cache`](Self::new_with_cache) instead.
696    ///
697    /// **Note**: No credentials are required as the dYdX Indexer API is publicly accessible.
698    /// Order submission and trading operations use gRPC with blockchain transaction signing.
699    ///
700    /// # Errors
701    ///
702    /// Returns an error if the underlying HTTP client or retry manager cannot be created.
703    pub fn new(
704        base_url: Option<String>,
705        timeout_secs: Option<u64>,
706        proxy_url: Option<String>,
707        is_testnet: bool,
708        retry_config: Option<RetryConfig>,
709    ) -> anyhow::Result<Self> {
710        Self::new_with_cache(
711            base_url,
712            timeout_secs,
713            proxy_url,
714            is_testnet,
715            retry_config,
716            Arc::new(InstrumentCache::new()),
717        )
718    }
719
720    /// Creates a new [`DydxHttpClient`] with a shared instrument cache.
721    ///
722    /// Use this constructor when sharing instrument data between HTTP client,
723    /// WebSocket client, and execution client.
724    ///
725    /// # Arguments
726    ///
727    /// * `instrument_cache` - Shared instrument cache for lookups by symbol, ticker, or clob_pair_id
728    ///
729    /// # Errors
730    ///
731    /// Returns an error if the underlying HTTP client or retry manager cannot be created.
732    pub fn new_with_cache(
733        base_url: Option<String>,
734        timeout_secs: Option<u64>,
735        proxy_url: Option<String>,
736        is_testnet: bool,
737        retry_config: Option<RetryConfig>,
738        instrument_cache: Arc<InstrumentCache>,
739    ) -> anyhow::Result<Self> {
740        Ok(Self {
741            inner: Arc::new(DydxRawHttpClient::new(
742                base_url,
743                timeout_secs,
744                proxy_url,
745                is_testnet,
746                retry_config,
747            )?),
748            instrument_cache,
749            clock: get_atomic_clock_realtime(),
750        })
751    }
752
753    /// Requests instruments from the dYdX Indexer API and returns Nautilus domain types.
754    ///
755    /// This method does NOT automatically cache results. Use `fetch_and_cache_instruments()`
756    /// for automatic caching, or call `cache_instruments()` manually with the results.
757    ///
758    /// # Errors
759    ///
760    /// Returns an error if the HTTP request or parsing fails.
761    /// Individual instrument parsing errors are logged as warnings.
762    pub async fn request_instruments(
763        &self,
764        symbol: Option<String>,
765        maker_fee: Option<Decimal>,
766        taker_fee: Option<Decimal>,
767    ) -> anyhow::Result<Vec<InstrumentAny>> {
768        let markets_response = self.inner.get_markets().await?;
769        let ts_init = self.generate_ts_init();
770
771        let mut instruments = Vec::new();
772        let mut skipped_inactive = 0;
773
774        for (ticker, market) in markets_response.markets {
775            // Filter by symbol if specified
776            if let Some(ref sym) = symbol
777                && ticker != *sym
778            {
779                continue;
780            }
781
782            if !super::parse::is_market_active(&market.status) {
783                log::debug!(
784                    "Skipping inactive market {ticker} (status: {:?})",
785                    market.status
786                );
787                skipped_inactive += 1;
788                continue;
789            }
790
791            match super::parse::parse_instrument_any(&market, maker_fee, taker_fee, ts_init) {
792                Ok(instrument) => {
793                    instruments.push(instrument);
794                }
795                Err(e) => {
796                    log::error!("Failed to parse instrument {ticker}: {e}");
797                }
798            }
799        }
800
801        if skipped_inactive > 0 {
802            log::info!(
803                "Parsed {} instruments, skipped {} inactive",
804                instruments.len(),
805                skipped_inactive
806            );
807        } else {
808            log::debug!("Parsed {} instruments", instruments.len());
809        }
810
811        Ok(instruments)
812    }
813
814    /// Fetches instruments from the API and caches them.
815    ///
816    /// This is a convenience method that fetches instruments and populates both
817    /// the symbol-based and CLOB pair ID-based caches.
818    ///
819    /// On success, existing caches are cleared and repopulated atomically.
820    /// On failure, existing caches are preserved (no partial updates).
821    ///
822    /// # Errors
823    ///
824    /// Returns an error if the HTTP request fails.
825    pub async fn fetch_and_cache_instruments(&self) -> anyhow::Result<()> {
826        // Fetch first - preserve existing cache on network failure
827        let markets_response = self.inner.get_markets().await?;
828        let ts_init = self.generate_ts_init();
829
830        let mut parsed_instruments = Vec::new();
831        let mut parsed_markets = Vec::new();
832        let mut skipped_inactive = 0;
833
834        for (ticker, market) in markets_response.markets {
835            if !super::parse::is_market_active(&market.status) {
836                log::debug!(
837                    "Skipping inactive market {ticker} (status: {:?})",
838                    market.status
839                );
840                skipped_inactive += 1;
841                continue;
842            }
843
844            match super::parse::parse_instrument_any(&market, None, None, ts_init) {
845                Ok(instrument) => {
846                    parsed_instruments.push(instrument);
847                    parsed_markets.push(market);
848                }
849                Err(e) => {
850                    log::error!("Failed to parse instrument {ticker}: {e}");
851                }
852            }
853        }
854
855        // Only clear and repopulate cache after successful fetch and parse
856        self.instrument_cache.clear();
857
858        // Zip instruments with their market data for bulk insert
859        let items: Vec<_> = parsed_instruments.into_iter().zip(parsed_markets).collect();
860
861        if !items.is_empty() {
862            self.instrument_cache.insert_many(items.clone());
863        }
864
865        let count = items.len();
866
867        if skipped_inactive > 0 {
868            log::info!("Cached {count} instruments, skipped {skipped_inactive} inactive");
869        } else {
870            log::info!("Cached {count} instruments");
871        }
872
873        Ok(())
874    }
875
876    /// Fetches a single instrument by ticker and caches it.
877    ///
878    /// # Errors
879    ///
880    /// Returns an error if the HTTP request fails.
881    pub async fn fetch_and_cache_single_instrument(
882        &self,
883        ticker: &str,
884    ) -> anyhow::Result<Option<InstrumentAny>> {
885        let markets_response = self.inner.get_market(ticker).await?;
886        let ts_init = self.generate_ts_init();
887
888        // The API returns all markets if ticker not found, so check specifically
889        if let Some(market) = markets_response.markets.get(ticker) {
890            if !super::parse::is_market_active(&market.status) {
891                log::debug!(
892                    "Skipping inactive market {ticker} (status: {:?})",
893                    market.status
894                );
895                return Ok(None);
896            }
897
898            let instrument = parse_instrument_any(market, None, None, ts_init)?;
899            self.instrument_cache
900                .insert(instrument.clone(), market.clone());
901
902            log::info!("Fetched and cached new instrument: {ticker}");
903            return Ok(Some(instrument));
904        }
905
906        Ok(None)
907    }
908
909    /// Caches multiple instruments (symbol lookup only).
910    ///
911    /// Use `fetch_and_cache_instruments()` for full caching with market params.
912    /// Any existing instruments with the same symbols will be replaced.
913    pub fn cache_instruments(&self, instruments: Vec<InstrumentAny>) {
914        self.instrument_cache.insert_instruments_only(instruments);
915    }
916
917    /// Caches a single instrument (symbol lookup only).
918    ///
919    /// Use `fetch_and_cache_instruments()` for full caching with market params.
920    /// Any existing instrument with the same symbol will be replaced.
921    pub fn cache_instrument(&self, instrument: InstrumentAny) {
922        self.instrument_cache.insert_instrument_only(instrument);
923    }
924
925    /// Gets an instrument from the cache by InstrumentId.
926    #[must_use]
927    pub fn get_instrument(&self, instrument_id: &InstrumentId) -> Option<InstrumentAny> {
928        self.instrument_cache.get(instrument_id)
929    }
930
931    /// Gets an instrument by CLOB pair ID.
932    ///
933    /// Only works for instruments cached via `fetch_and_cache_instruments()`.
934    #[must_use]
935    pub fn get_instrument_by_clob_id(&self, clob_pair_id: u32) -> Option<InstrumentAny> {
936        self.instrument_cache.get_by_clob_id(clob_pair_id)
937    }
938
939    /// Gets an instrument by market ticker (e.g., "BTC-USD").
940    ///
941    /// Only works for instruments cached via `fetch_and_cache_instruments()`.
942    #[must_use]
943    pub fn get_instrument_by_market(&self, ticker: &str) -> Option<InstrumentAny> {
944        self.instrument_cache.get_by_market(ticker)
945    }
946
947    /// Gets market parameters for order submission from the cached market data.
948    ///
949    /// Returns the quantization parameters needed by OrderBuilder to construct
950    /// properly formatted orders for the dYdX v4 protocol.
951    ///
952    /// # Errors
953    ///
954    /// Returns None if the instrument is not found in the market params cache.
955    #[must_use]
956    pub fn get_market_params(
957        &self,
958        instrument_id: &InstrumentId,
959    ) -> Option<super::models::PerpetualMarket> {
960        self.instrument_cache.get_market_params(instrument_id)
961    }
962
963    /// Requests historical trades for a symbol.
964    ///
965    /// Fetches trade data from the dYdX Indexer API's `/v4/trades/perpetualMarket/:ticker` endpoint.
966    /// Results are ordered by creation time descending (newest first).
967    ///
968    /// # Errors
969    ///
970    /// Returns an error if the HTTP request fails or response cannot be parsed.
971    pub async fn request_trades(
972        &self,
973        symbol: &str,
974        limit: Option<u32>,
975        starting_before_or_at_height: Option<u64>,
976    ) -> anyhow::Result<super::models::TradesResponse> {
977        self.inner
978            .get_trades(symbol, limit, starting_before_or_at_height)
979            .await
980            .map_err(Into::into)
981    }
982
983    /// Requests historical candles for a symbol.
984    ///
985    /// Fetches candle data from the dYdX Indexer API's `/v4/candles/perpetualMarkets/:ticker` endpoint.
986    /// Results are ordered by start time ascending (oldest first).
987    ///
988    /// # Errors
989    ///
990    /// Returns an error if the HTTP request fails or response cannot be parsed.
991    pub async fn request_candles(
992        &self,
993        symbol: &str,
994        resolution: DydxCandleResolution,
995        limit: Option<u32>,
996        from_iso: Option<DateTime<Utc>>,
997        to_iso: Option<DateTime<Utc>>,
998    ) -> anyhow::Result<super::models::CandlesResponse> {
999        self.inner
1000            .get_candles(symbol, resolution, limit, from_iso, to_iso)
1001            .await
1002            .map_err(Into::into)
1003    }
1004
1005    /// Requests historical bars for an instrument with optional pagination.
1006    ///
1007    /// Fetches candle data from the dYdX Indexer API and converts to Nautilus
1008    /// `Bar` objects. Supports time-chunked pagination for large date ranges.
1009    ///
1010    /// The resolution is derived internally from `bar_type` (no need to pass
1011    /// `DydxCandleResolution`). Incomplete bars (where `ts_event >= now`) are
1012    /// filtered out.
1013    ///
1014    /// Results are returned in chronological order (oldest first).
1015    ///
1016    /// # Errors
1017    ///
1018    /// Returns an error if:
1019    /// - The bar type uses unsupported aggregation/price type.
1020    /// - The HTTP request fails or response cannot be parsed.
1021    /// - The instrument is not found in the cache.
1022    pub async fn request_bars(
1023        &self,
1024        bar_type: BarType,
1025        start: Option<DateTime<Utc>>,
1026        end: Option<DateTime<Utc>>,
1027        limit: Option<u32>,
1028        timestamp_on_close: bool,
1029    ) -> anyhow::Result<Vec<Bar>> {
1030        let resolution = bar_type_to_resolution(&bar_type)?;
1031        let instrument_id = bar_type.instrument_id();
1032
1033        let instrument = self
1034            .get_instrument(&instrument_id)
1035            .ok_or_else(|| anyhow::anyhow!("Instrument not found in cache: {instrument_id}"))?;
1036
1037        let ticker = extract_raw_symbol(instrument_id.symbol.as_str());
1038        let price_precision = instrument.price_precision();
1039        let size_precision = instrument.size_precision();
1040        let ts_init = self.generate_ts_init();
1041
1042        let mut all_bars: Vec<Bar> = Vec::new();
1043
1044        // Determine bar duration in seconds for pagination chunking
1045        let spec = bar_type.spec();
1046        let bar_secs: i64 = match spec.aggregation {
1047            BarAggregation::Minute => spec.step.get() as i64 * 60,
1048            BarAggregation::Hour => spec.step.get() as i64 * 3_600,
1049            BarAggregation::Day => spec.step.get() as i64 * 86_400,
1050            _ => anyhow::bail!("Unsupported aggregation: {:?}", spec.aggregation),
1051        };
1052
1053        match (start, end) {
1054            // Time-chunked pagination for date ranges
1055            (Some(range_start), Some(range_end)) if range_end > range_start => {
1056                let overall_limit = limit.unwrap_or(u32::MAX);
1057                let mut remaining = overall_limit;
1058                let bars_per_call = DYDX_MAX_BARS_PER_REQUEST.min(remaining);
1059                let chunk_duration = chrono::Duration::seconds(bar_secs * bars_per_call as i64);
1060                let mut chunk_start = range_start;
1061
1062                while chunk_start < range_end && remaining > 0 {
1063                    let chunk_end = (chunk_start + chunk_duration).min(range_end);
1064                    let per_call_limit = remaining.min(DYDX_MAX_BARS_PER_REQUEST);
1065
1066                    let response = self
1067                        .inner
1068                        .get_candles(
1069                            ticker,
1070                            resolution,
1071                            Some(per_call_limit),
1072                            Some(chunk_start),
1073                            Some(chunk_end),
1074                        )
1075                        .await?;
1076
1077                    let count = response.candles.len() as u32;
1078                    if count == 0 {
1079                        break;
1080                    }
1081
1082                    for candle in &response.candles {
1083                        match super::parse::parse_bar(
1084                            candle,
1085                            bar_type,
1086                            price_precision,
1087                            size_precision,
1088                            timestamp_on_close,
1089                            ts_init,
1090                        ) {
1091                            Ok(bar) => all_bars.push(bar),
1092                            Err(e) => log::warn!("Failed to parse candle for {instrument_id}: {e}"),
1093                        }
1094                    }
1095
1096                    if remaining <= count {
1097                        break;
1098                    }
1099                    remaining -= count;
1100                    chunk_start += chunk_duration;
1101                }
1102            }
1103            // Single request (no date range or invalid range)
1104            _ => {
1105                let req_limit = limit.unwrap_or(DYDX_MAX_BARS_PER_REQUEST);
1106                let response = self
1107                    .inner
1108                    .get_candles(ticker, resolution, Some(req_limit), None, None)
1109                    .await?;
1110
1111                for candle in &response.candles {
1112                    match super::parse::parse_bar(
1113                        candle,
1114                        bar_type,
1115                        price_precision,
1116                        size_precision,
1117                        timestamp_on_close,
1118                        ts_init,
1119                    ) {
1120                        Ok(bar) => all_bars.push(bar),
1121                        Err(e) => log::warn!("Failed to parse candle for {instrument_id}: {e}"),
1122                    }
1123                }
1124            }
1125        }
1126
1127        // Filter incomplete bars (ts_event >= current time)
1128        let current_time_ns = self.generate_ts_init();
1129        all_bars.retain(|bar| bar.ts_event < current_time_ns);
1130
1131        Ok(all_bars)
1132    }
1133
1134    /// Requests historical trade ticks for an instrument with optional pagination.
1135    ///
1136    /// Fetches trade data from the dYdX Indexer API and converts them to Nautilus
1137    /// `TradeTick` objects. Supports cursor-based pagination using block height
1138    /// and client-side time filtering (the dYdX API has no timestamp filter).
1139    ///
1140    /// Results are returned in chronological order (oldest first).
1141    ///
1142    /// # Errors
1143    ///
1144    /// Returns an error if the HTTP request fails, response cannot be parsed,
1145    /// or the instrument is not found in the cache.
1146    ///
1147    /// # Panics
1148    ///
1149    /// This function will panic if the API returns a non-empty trades response
1150    /// but `last()` on the trades vector returns `None` (should never happen).
1151    pub async fn request_trade_ticks(
1152        &self,
1153        instrument_id: InstrumentId,
1154        start: Option<DateTime<Utc>>,
1155        end: Option<DateTime<Utc>>,
1156        limit: Option<u32>,
1157    ) -> anyhow::Result<Vec<TradeTick>> {
1158        const DYDX_MAX_TRADES_PER_REQUEST: u32 = 1_000;
1159        const DYDX_BLOCK_TIME_SECS: f64 = 1.1;
1160
1161        // Validation
1162        if let (Some(s), Some(e)) = (start, end) {
1163            anyhow::ensure!(s < e, "start ({s}) must be before end ({e})");
1164        }
1165
1166        let instrument = self
1167            .get_instrument(&instrument_id)
1168            .ok_or_else(|| anyhow::anyhow!("Instrument not found in cache: {instrument_id}"))?;
1169
1170        let ticker = extract_raw_symbol(instrument_id.symbol.as_str());
1171        let price_precision = instrument.price_precision();
1172        let size_precision = instrument.size_precision();
1173        let ts_init = self.generate_ts_init();
1174
1175        // When an end time is provided, estimate the block height at that time
1176        // so we can skip directly to the relevant window instead of paginating
1177        // from the latest trade backward (which can be extremely slow for liquid markets).
1178        let initial_cursor = if let Some(end_time) = end {
1179            match self.inner.get_height().await {
1180                Ok(height_resp) => {
1181                    let secs_ahead = (height_resp.time - end_time).num_seconds();
1182                    if secs_ahead > 0 {
1183                        let blocks_to_skip = (secs_ahead as f64 / DYDX_BLOCK_TIME_SECS) as u64;
1184                        let target = height_resp.height.saturating_sub(blocks_to_skip);
1185                        log::debug!(
1186                            "Estimated block height at {end_time}: {target} \
1187                             (current: {}, skipping ~{blocks_to_skip} blocks)",
1188                            height_resp.height,
1189                        );
1190                        Some(target)
1191                    } else {
1192                        None // end_time is in the future, start from latest
1193                    }
1194                }
1195                Err(e) => {
1196                    log::warn!(
1197                        "Failed to get block height for time skip, paginating from latest: {e}"
1198                    );
1199                    None
1200                }
1201            }
1202        } else {
1203            None
1204        };
1205
1206        let overall_limit = limit.unwrap_or(u32::MAX);
1207        let mut remaining = overall_limit;
1208        let mut cursor_height: Option<u64> = initial_cursor;
1209        let mut all_trades = Vec::new();
1210
1211        loop {
1212            let page_limit = remaining.min(DYDX_MAX_TRADES_PER_REQUEST);
1213            let response = self
1214                .inner
1215                .get_trades(ticker, Some(page_limit), cursor_height)
1216                .await?;
1217
1218            let page_count = response.trades.len() as u32;
1219            if page_count == 0 {
1220                break;
1221            }
1222
1223            // Trades come newest-first; oldest is last
1224            let oldest_trade = response.trades.last().unwrap();
1225
1226            // Update cursor for next page (go further back in time)
1227            cursor_height = Some(oldest_trade.created_at_height.saturating_sub(1));
1228
1229            // Break if we've reached before the start boundary
1230            if let Some(s) = start
1231                && oldest_trade.created_at < s
1232            {
1233                // This page contains trades before start — filter and stop
1234                for trade in &response.trades {
1235                    if start.is_some_and(|s| trade.created_at < s) {
1236                        continue;
1237                    }
1238
1239                    if end.is_some_and(|e| trade.created_at > e) {
1240                        continue;
1241                    }
1242                    all_trades.push(super::parse::parse_trade_tick(
1243                        trade,
1244                        instrument_id,
1245                        price_precision,
1246                        size_precision,
1247                        ts_init,
1248                    )?);
1249                }
1250                break;
1251            }
1252
1253            // Convert all trades in this page (with time filtering)
1254            for trade in &response.trades {
1255                if start.is_some_and(|s| trade.created_at < s) {
1256                    continue;
1257                }
1258
1259                if end.is_some_and(|e| trade.created_at > e) {
1260                    continue;
1261                }
1262                all_trades.push(super::parse::parse_trade_tick(
1263                    trade,
1264                    instrument_id,
1265                    price_precision,
1266                    size_precision,
1267                    ts_init,
1268                )?);
1269            }
1270
1271            remaining = remaining.saturating_sub(page_count);
1272
1273            // Break on partial page (no more data) or limit reached
1274            if page_count < page_limit || remaining == 0 {
1275                break;
1276            }
1277        }
1278
1279        // Reverse to chronological order (oldest first) and dedup
1280        all_trades.reverse();
1281        all_trades.dedup_by(|a, b| a.trade_id == b.trade_id);
1282
1283        // Truncate to requested limit
1284        if let Some(lim) = limit {
1285            all_trades.truncate(lim as usize);
1286        }
1287
1288        Ok(all_trades)
1289    }
1290
1291    /// Requests an order book snapshot for a symbol.
1292    ///
1293    /// Fetches order book data from the dYdX Indexer API and converts it to Nautilus
1294    /// `OrderBookDeltas`. The snapshot is represented as a sequence of deltas starting
1295    /// with a CLEAR action followed by ADD actions for each level.
1296    ///
1297    /// # Errors
1298    ///
1299    /// Returns an error if the HTTP request fails, response cannot be parsed,
1300    /// or the instrument is not found in the cache.
1301    pub async fn request_orderbook_snapshot(
1302        &self,
1303        instrument_id: InstrumentId,
1304    ) -> anyhow::Result<OrderBookDeltas> {
1305        let instrument = self
1306            .get_instrument(&instrument_id)
1307            .ok_or_else(|| anyhow::anyhow!("Instrument not found in cache: {instrument_id}"))?;
1308
1309        let ticker = extract_raw_symbol(instrument_id.symbol.as_str());
1310        let response = self.inner.get_orderbook(ticker).await?;
1311
1312        let ts_init = self.generate_ts_init();
1313
1314        let mut deltas = Vec::with_capacity(1 + response.bids.len() + response.asks.len());
1315
1316        deltas.push(OrderBookDelta::clear(instrument_id, 0, ts_init, ts_init));
1317
1318        for (i, level) in response.bids.iter().enumerate() {
1319            let is_last = i == response.bids.len() - 1 && response.asks.is_empty();
1320            let flags = if is_last { RecordFlag::F_LAST as u8 } else { 0 };
1321
1322            let order = BookOrder::new(
1323                NautilusOrderSide::Buy,
1324                Price::from_decimal_dp(level.price, instrument.price_precision())?,
1325                Quantity::from_decimal_dp(level.size, instrument.size_precision())?,
1326                0,
1327            );
1328
1329            deltas.push(OrderBookDelta::new(
1330                instrument_id,
1331                BookAction::Add,
1332                order,
1333                flags,
1334                0,
1335                ts_init,
1336                ts_init,
1337            ));
1338        }
1339
1340        for (i, level) in response.asks.iter().enumerate() {
1341            let is_last = i == response.asks.len() - 1;
1342            let flags = if is_last { RecordFlag::F_LAST as u8 } else { 0 };
1343
1344            let order = BookOrder::new(
1345                NautilusOrderSide::Sell,
1346                Price::from_decimal_dp(level.price, instrument.price_precision())?,
1347                Quantity::from_decimal_dp(level.size, instrument.size_precision())?,
1348                0,
1349            );
1350
1351            deltas.push(OrderBookDelta::new(
1352                instrument_id,
1353                BookAction::Add,
1354                order,
1355                flags,
1356                0,
1357                ts_init,
1358                ts_init,
1359            ));
1360        }
1361
1362        Ok(OrderBookDeltas::new(instrument_id, deltas))
1363    }
1364
1365    /// Exposes raw HTTP client for testing and advanced use cases.
1366    ///
1367    /// This provides access to the underlying [`DydxRawHttpClient`] for cases
1368    /// where low-level API access is needed. Most users should use the domain
1369    /// client methods instead.
1370    #[must_use]
1371    pub fn raw_client(&self) -> &Arc<DydxRawHttpClient> {
1372        &self.inner
1373    }
1374
1375    /// Check if this client is configured for testnet.
1376    #[must_use]
1377    pub fn is_testnet(&self) -> bool {
1378        self.inner.is_testnet()
1379    }
1380
1381    /// Get the base URL being used by this client.
1382    #[must_use]
1383    pub fn base_url(&self) -> &str {
1384        self.inner.base_url()
1385    }
1386
1387    /// Check if the instrument cache has been initialized.
1388    #[must_use]
1389    pub fn is_cache_initialized(&self) -> bool {
1390        self.instrument_cache.is_initialized()
1391    }
1392
1393    /// Get the number of instruments currently cached.
1394    #[must_use]
1395    pub fn cached_instruments_count(&self) -> usize {
1396        self.instrument_cache.len()
1397    }
1398
1399    /// Returns a reference to the shared instrument cache.
1400    ///
1401    /// The cache provides lookups by symbol, market ticker, and clob_pair_id.
1402    #[must_use]
1403    pub fn instrument_cache(&self) -> &Arc<InstrumentCache> {
1404        &self.instrument_cache
1405    }
1406
1407    /// Returns all cached instruments.
1408    ///
1409    /// This is a convenience method that collects all instruments into a Vec.
1410    #[must_use]
1411    pub fn all_instruments(&self) -> Vec<InstrumentAny> {
1412        self.instrument_cache.all_instruments()
1413    }
1414
1415    /// Returns all cached instrument IDs.
1416    #[must_use]
1417    pub fn all_instrument_ids(&self) -> Vec<InstrumentId> {
1418        self.instrument_cache.all_instrument_ids()
1419    }
1420
1421    fn generate_ts_init(&self) -> UnixNanos {
1422        self.clock.get_time_ns()
1423    }
1424
1425    /// Requests order status reports for a subaccount.
1426    ///
1427    /// Fetches orders from the dYdX Indexer API and converts them to Nautilus
1428    /// `OrderStatusReport` objects.
1429    ///
1430    /// # Errors
1431    ///
1432    /// Returns an error if the HTTP request fails or parsing fails.
1433    pub async fn request_order_status_reports(
1434        &self,
1435        address: &str,
1436        subaccount_number: u32,
1437        account_id: AccountId,
1438        instrument_id: Option<InstrumentId>,
1439    ) -> anyhow::Result<Vec<OrderStatusReport>> {
1440        let ts_init = self.generate_ts_init();
1441
1442        // Convert instrument_id to market filter
1443        let market = instrument_id.map(|id| {
1444            let symbol = id.symbol.to_string();
1445            // Remove -PERP suffix if present to get the dYdX market format (e.g., ETH-USD)
1446            symbol.trim_end_matches("-PERP").to_string()
1447        });
1448
1449        let orders = self
1450            .inner
1451            .get_orders(address, subaccount_number, market.as_deref(), None)
1452            .await?;
1453
1454        let mut reports = Vec::new();
1455
1456        for order in orders {
1457            // Get instrument by clob_pair_id
1458            let instrument = match self.get_instrument_by_clob_id(order.clob_pair_id) {
1459                Some(inst) => inst,
1460                None => {
1461                    log::warn!(
1462                        "Skipping order {}: no cached instrument for clob_pair_id {}",
1463                        order.id,
1464                        order.clob_pair_id
1465                    );
1466                    continue;
1467                }
1468            };
1469
1470            // Filter by instrument_id if specified
1471            if instrument_id.is_some_and(|filter_id| instrument.id() != filter_id) {
1472                continue;
1473            }
1474
1475            match super::parse::parse_order_status_report(&order, &instrument, account_id, ts_init)
1476            {
1477                Ok(report) => reports.push(report),
1478                Err(e) => {
1479                    log::warn!("Failed to parse order {}: {e}", order.id);
1480                }
1481            }
1482        }
1483
1484        Ok(reports)
1485    }
1486
1487    /// Requests fill reports for a subaccount.
1488    ///
1489    /// Fetches fills from the dYdX Indexer API and converts them to Nautilus
1490    /// `FillReport` objects.
1491    ///
1492    /// # Errors
1493    ///
1494    /// Returns an error if the HTTP request fails or parsing fails.
1495    pub async fn request_fill_reports(
1496        &self,
1497        address: &str,
1498        subaccount_number: u32,
1499        account_id: AccountId,
1500        instrument_id: Option<InstrumentId>,
1501    ) -> anyhow::Result<Vec<FillReport>> {
1502        let ts_init = self.generate_ts_init();
1503
1504        // Convert instrument_id to market filter
1505        let market = instrument_id.map(|id| {
1506            let symbol = id.symbol.to_string();
1507            symbol.trim_end_matches("-PERP").to_string()
1508        });
1509
1510        let fills_response = self
1511            .inner
1512            .get_fills(address, subaccount_number, market.as_deref(), None)
1513            .await?;
1514
1515        let mut reports = Vec::new();
1516
1517        for fill in fills_response.fills {
1518            // Get instrument by market ticker (e.g., "BTC-USD")
1519            let instrument = match self.get_instrument_by_market(&fill.market) {
1520                Some(inst) => inst,
1521                None => {
1522                    log::warn!(
1523                        "Skipping fill {}: no cached instrument for market {}",
1524                        fill.id,
1525                        fill.market
1526                    );
1527                    continue;
1528                }
1529            };
1530
1531            // Filter by instrument_id if specified
1532            if instrument_id.is_some_and(|filter_id| instrument.id() != filter_id) {
1533                continue;
1534            }
1535
1536            match super::parse::parse_fill_report(&fill, &instrument, account_id, ts_init) {
1537                Ok(report) => reports.push(report),
1538                Err(e) => {
1539                    log::warn!("Failed to parse fill {}: {e}", fill.id);
1540                }
1541            }
1542        }
1543
1544        Ok(reports)
1545    }
1546
1547    /// Requests position status reports for a subaccount.
1548    ///
1549    /// Fetches positions from the dYdX Indexer API and converts them to Nautilus
1550    /// `PositionStatusReport` objects.
1551    ///
1552    /// # Errors
1553    ///
1554    /// Returns an error if the HTTP request fails or parsing fails.
1555    pub async fn request_position_status_reports(
1556        &self,
1557        address: &str,
1558        subaccount_number: u32,
1559        account_id: AccountId,
1560        instrument_id: Option<InstrumentId>,
1561    ) -> anyhow::Result<Vec<PositionStatusReport>> {
1562        let ts_init = self.generate_ts_init();
1563
1564        let subaccount_response = self
1565            .inner
1566            .get_subaccount(address, subaccount_number)
1567            .await?;
1568
1569        let mut reports = Vec::new();
1570
1571        for (market, position) in subaccount_response.subaccount.open_perpetual_positions {
1572            // Get instrument by market ticker (e.g., "BTC-USD")
1573            let instrument = match self.get_instrument_by_market(&market) {
1574                Some(inst) => inst,
1575                None => {
1576                    log::warn!("Skipping position: no cached instrument for market {market}");
1577                    continue;
1578                }
1579            };
1580
1581            // Filter by instrument_id if specified
1582            if instrument_id.is_some_and(|filter_id| instrument.id() != filter_id) {
1583                continue;
1584            }
1585
1586            match super::parse::parse_position_status_report(
1587                &position,
1588                &instrument,
1589                account_id,
1590                ts_init,
1591            ) {
1592                Ok(report) => reports.push(report),
1593                Err(e) => {
1594                    log::warn!("Failed to parse position for {market}: {e}");
1595                }
1596            }
1597        }
1598
1599        Ok(reports)
1600    }
1601
1602    /// Requests account state for a subaccount.
1603    ///
1604    /// Fetches the subaccount from the dYdX Indexer API and converts it to a Nautilus
1605    /// `AccountState` with balances and margin calculations.
1606    ///
1607    /// # Errors
1608    ///
1609    /// Returns an error if the HTTP request fails or parsing fails.
1610    pub async fn request_account_state(
1611        &self,
1612        address: &str,
1613        subaccount_number: u32,
1614        account_id: AccountId,
1615    ) -> anyhow::Result<AccountState> {
1616        let ts_init = self.generate_ts_init();
1617        let subaccount_response = self
1618            .inner
1619            .get_subaccount(address, subaccount_number)
1620            .await?;
1621
1622        // Build instruments map from cache
1623        let instruments: HashMap<InstrumentId, InstrumentAny> = self
1624            .instrument_cache
1625            .all_instruments()
1626            .into_iter()
1627            .map(|inst| (inst.id(), inst))
1628            .collect();
1629
1630        // Use current oracle prices from instrument cache (updated via WS)
1631        let oracle_prices = self.instrument_cache.to_oracle_prices_map();
1632
1633        parse_account_state_from_http(
1634            &subaccount_response.subaccount,
1635            account_id,
1636            &instruments,
1637            &oracle_prices,
1638            ts_init,
1639            ts_init,
1640        )
1641    }
1642}
1643
1644#[cfg(test)]
1645mod tests {
1646    use axum::{Router, routing::get};
1647    use nautilus_model::identifiers::{Symbol, Venue};
1648    use rstest::rstest;
1649
1650    use super::*;
1651    use crate::http::error;
1652
1653    #[tokio::test]
1654    async fn test_raw_client_creation() {
1655        let client = DydxRawHttpClient::new(None, Some(30), None, false, None);
1656        assert!(client.is_ok());
1657
1658        let client = client.unwrap();
1659        assert!(!client.is_testnet());
1660        assert_eq!(client.base_url(), DYDX_HTTP_URL);
1661    }
1662
1663    #[tokio::test]
1664    async fn test_raw_client_testnet() {
1665        let client = DydxRawHttpClient::new(None, Some(30), None, true, None);
1666        assert!(client.is_ok());
1667
1668        let client = client.unwrap();
1669        assert!(client.is_testnet());
1670        assert_eq!(client.base_url(), DYDX_TESTNET_HTTP_URL);
1671    }
1672
1673    #[tokio::test]
1674    async fn test_domain_client_creation() {
1675        let client = DydxHttpClient::new(None, Some(30), None, false, None);
1676        assert!(client.is_ok());
1677
1678        let client = client.unwrap();
1679        assert!(!client.is_testnet());
1680        assert_eq!(client.base_url(), DYDX_HTTP_URL);
1681        assert!(!client.is_cache_initialized());
1682        assert_eq!(client.cached_instruments_count(), 0);
1683    }
1684
1685    #[tokio::test]
1686    async fn test_domain_client_testnet() {
1687        let client = DydxHttpClient::new(None, Some(30), None, true, None);
1688        assert!(client.is_ok());
1689
1690        let client = client.unwrap();
1691        assert!(client.is_testnet());
1692        assert_eq!(client.base_url(), DYDX_TESTNET_HTTP_URL);
1693    }
1694
1695    #[tokio::test]
1696    async fn test_domain_client_default() {
1697        let client = DydxHttpClient::default();
1698        assert!(!client.is_testnet());
1699        assert_eq!(client.base_url(), DYDX_HTTP_URL);
1700        assert!(!client.is_cache_initialized());
1701    }
1702
1703    #[tokio::test]
1704    async fn test_domain_client_clone() {
1705        let client = DydxHttpClient::new(None, Some(30), None, false, None).unwrap();
1706
1707        // Clone before initialization
1708        let cloned = client.clone();
1709        assert!(!cloned.is_cache_initialized());
1710
1711        client.instrument_cache.insert_instruments_only(vec![]);
1712
1713        // Clone after initialization
1714        #[allow(clippy::redundant_clone)]
1715        let cloned_after = client.clone();
1716        assert!(cloned_after.is_cache_initialized());
1717    }
1718
1719    #[rstest]
1720    fn test_domain_client_get_instrument_not_found() {
1721        let client = DydxHttpClient::default();
1722        let instrument_id = InstrumentId::new(Symbol::new("ETH-USD-PERP"), Venue::new("DYDX"));
1723        let result = client.get_instrument(&instrument_id);
1724        assert!(result.is_none());
1725    }
1726
1727    #[tokio::test]
1728    async fn test_http_timeout_respects_configuration_and_does_not_block() {
1729        use tokio::net::TcpListener;
1730
1731        async fn slow_handler() -> &'static str {
1732            // Sleep longer than the configured HTTP timeout.
1733            tokio::time::sleep(std::time::Duration::from_secs(5)).await;
1734            "ok"
1735        }
1736
1737        let router = Router::new().route("/v4/slow", get(slow_handler));
1738
1739        let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
1740        let addr = listener.local_addr().unwrap();
1741
1742        tokio::spawn(async move {
1743            axum::serve(listener, router.into_make_service())
1744                .await
1745                .unwrap();
1746        });
1747
1748        let base_url = format!("http://{addr}");
1749
1750        // Configure a small operation timeout and no retries so the request
1751        // fails quickly even though the handler sleeps for 5 seconds.
1752        let retry_config = RetryConfig {
1753            max_retries: 0,
1754            initial_delay_ms: 0,
1755            max_delay_ms: 0,
1756            backoff_factor: 1.0,
1757            jitter_ms: 0,
1758            operation_timeout_ms: Some(500),
1759            immediate_first: true,
1760            max_elapsed_ms: Some(1_000),
1761        };
1762
1763        // Keep HTTP client timeout at a typical value; rely on RetryManager
1764        // operation timeout to enforce non-blocking behavior.
1765        let client =
1766            DydxRawHttpClient::new(Some(base_url), Some(60), None, false, Some(retry_config))
1767                .unwrap();
1768
1769        let start = std::time::Instant::now();
1770        let result: Result<serde_json::Value, error::DydxHttpError> =
1771            client.send_request(Method::GET, "/v4/slow", None).await;
1772        let elapsed = start.elapsed();
1773
1774        // Request should fail (timeout or client error), but without blocking the thread
1775        // for the full handler duration.
1776        assert!(result.is_err());
1777        assert!(elapsed < std::time::Duration::from_secs(3));
1778    }
1779}