Skip to main content

nautilus_coinbase_intx/http/
client.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Provides the HTTP client integration for the [Coinbase International](https://www.coinbase.com/en/international-exchange) REST API.
17//!
18//! This module defines and implements a [`CoinbaseIntxHttpClient`] for
19//! sending requests to various Coinbase endpoints. It handles request signing
20//! (when credentials are provided), constructs valid HTTP requests
21//! using the [`HttpClient`], and parses the responses back into structured data or a [`CoinbaseIntxHttpError`].
22
23use std::{
24    collections::HashMap,
25    num::NonZeroU32,
26    sync::{Arc, LazyLock, Mutex},
27};
28
29use anyhow::Context;
30use chrono::{DateTime, Utc};
31use nautilus_core::{
32    MUTEX_POISONED, UnixNanos, consts::NAUTILUS_USER_AGENT, env::get_or_env_var,
33    time::get_atomic_clock_realtime,
34};
35use nautilus_model::{
36    enums::{OrderSide, OrderType, TimeInForce},
37    events::AccountState,
38    identifiers::{AccountId, ClientOrderId, Symbol, VenueOrderId},
39    instruments::{Instrument, InstrumentAny},
40    reports::{FillReport, OrderStatusReport, PositionStatusReport},
41    types::{Price, Quantity},
42};
43use nautilus_network::{
44    http::{HttpClient, HttpClientError, Method, StatusCode, USER_AGENT},
45    ratelimiter::quota::Quota,
46};
47use serde::{Deserialize, Serialize, de::DeserializeOwned};
48use ustr::Ustr;
49
50use super::{
51    error::CoinbaseIntxHttpError,
52    models::{
53        CoinbaseIntxAsset, CoinbaseIntxBalance, CoinbaseIntxFeeTier, CoinbaseIntxFillList,
54        CoinbaseIntxInstrument, CoinbaseIntxOrder, CoinbaseIntxOrderList, CoinbaseIntxPortfolio,
55        CoinbaseIntxPortfolioDetails, CoinbaseIntxPortfolioFeeRates, CoinbaseIntxPortfolioSummary,
56        CoinbaseIntxPosition,
57    },
58    parse::{
59        parse_account_state, parse_fill_report, parse_instrument_any, parse_order_status_report,
60        parse_position_status_report,
61    },
62    query::{
63        CancelOrderParams, CancelOrdersParams, CreateOrderParams, CreateOrderParamsBuilder,
64        GetOrderParams, GetOrdersParams, GetOrdersParamsBuilder, GetPortfolioFillsParams,
65        GetPortfolioFillsParamsBuilder, ModifyOrderParams,
66    },
67};
68use crate::{
69    common::{
70        consts::COINBASE_INTX_REST_URL,
71        credential::Credential,
72        enums::{CoinbaseIntxOrderType, CoinbaseIntxSide, CoinbaseIntxTimeInForce},
73    },
74    http::{
75        error::ErrorBody,
76        query::{CancelOrdersParamsBuilder, ModifyOrderParamsBuilder},
77    },
78};
79
80/// Represents an Coinbase HTTP response.
81#[derive(Debug, Serialize, Deserialize)]
82pub struct CoinbaseIntxResponse<T> {
83    /// The Coinbase response code, which is `"0"` for success.
84    pub code: String,
85    /// A message string which can be informational or describe an error cause.
86    pub msg: String,
87    /// The typed data returned by the Coinbase endpoint.
88    pub data: Vec<T>,
89}
90
91// https://docs.cdp.coinbase.com/intx/docs/rate-limits#rest-api-rate-limits
92pub static COINBASE_INTX_REST_QUOTA: LazyLock<Quota> = LazyLock::new(|| {
93    Quota::per_second(NonZeroU32::new(100).expect("non-zero")).expect("valid constant")
94});
95
96/// Provides a lower-level HTTP client for connecting to the [Coinbase International](https://coinbase.com) REST API.
97///
98/// This client wraps the underlying `HttpClient` to handle functionality
99/// specific to Coinbase, such as request signing (for authenticated endpoints),
100/// forming request URLs, and deserializing responses into specific data models.
101#[derive(Debug, Clone)]
102pub struct CoinbaseIntxHttpInnerClient {
103    base_url: String,
104    client: HttpClient,
105    credential: Option<Credential>,
106}
107
108impl Default for CoinbaseIntxHttpInnerClient {
109    fn default() -> Self {
110        Self::new(None, Some(60)).expect("Failed to create default Coinbase INTX HTTP client")
111    }
112}
113
114impl CoinbaseIntxHttpInnerClient {
115    /// Creates a new [`CoinbaseIntxHttpClient`] using the default Coinbase HTTP URL,
116    /// optionally overridden with a custom base url.
117    ///
118    /// This version of the client has **no credentials**, so it can only
119    /// call publicly accessible endpoints.
120    ///
121    /// # Errors
122    ///
123    /// Returns an error if the HTTP client cannot be created.
124    pub fn new(
125        base_url: Option<String>,
126        timeout_secs: Option<u64>,
127    ) -> Result<Self, HttpClientError> {
128        Ok(Self {
129            base_url: base_url.unwrap_or(COINBASE_INTX_REST_URL.to_string()),
130            client: HttpClient::new(
131                Self::default_headers(),
132                vec![],
133                vec![],
134                Some(*COINBASE_INTX_REST_QUOTA),
135                timeout_secs,
136                None, // proxy_url
137            )?,
138            credential: None,
139        })
140    }
141
142    /// Creates a new [`CoinbaseIntxHttpClient`] configured with credentials
143    /// for authenticated requests, optionally using a custom base url.
144    ///
145    /// # Errors
146    ///
147    /// Returns an error if the HTTP client cannot be created.
148    pub fn with_credentials(
149        api_key: String,
150        api_secret: String,
151        api_passphrase: String,
152        base_url: String,
153        timeout_secs: Option<u64>,
154    ) -> Result<Self, HttpClientError> {
155        Ok(Self {
156            base_url,
157            client: HttpClient::new(
158                Self::default_headers(),
159                vec![],
160                vec![],
161                Some(*COINBASE_INTX_REST_QUOTA),
162                timeout_secs,
163                None, // proxy_url
164            )?,
165            credential: Some(Credential::new(api_key, api_secret, api_passphrase)),
166        })
167    }
168
169    /// Builds the default headers to include with each request (e.g., `User-Agent`).
170    fn default_headers() -> HashMap<String, String> {
171        HashMap::from([(USER_AGENT.to_string(), NAUTILUS_USER_AGENT.to_string())])
172    }
173
174    /// Signs an Coinbase request with timestamp, API key, passphrase, and signature.
175    ///
176    /// # Errors
177    ///
178    /// Returns [`CoinbaseHttpError::MissingCredentials`] if no credentials are set
179    /// but the request requires authentication.
180    fn sign_request(
181        &self,
182        method: &Method,
183        path: &str,
184        body: Option<&[u8]>,
185    ) -> Result<HashMap<String, String>, CoinbaseIntxHttpError> {
186        let credential = match self.credential.as_ref() {
187            Some(c) => c,
188            None => return Err(CoinbaseIntxHttpError::MissingCredentials),
189        };
190
191        let api_key = credential.api_key.clone().to_string();
192        let api_passphrase = credential.api_passphrase.clone().to_string();
193        let timestamp = Utc::now().timestamp().to_string();
194        let body_str = body
195            .and_then(|b| String::from_utf8(b.to_vec()).ok())
196            .unwrap_or_default();
197
198        let signature = credential.sign(&timestamp, method.as_str(), path, &body_str);
199
200        let mut headers = HashMap::new();
201        headers.insert("Accept".to_string(), "application/json".to_string());
202        headers.insert("CB-ACCESS-KEY".to_string(), api_key);
203        headers.insert("CB-ACCESS-PASSPHRASE".to_string(), api_passphrase);
204        headers.insert("CB-ACCESS-SIGN".to_string(), signature);
205        headers.insert("CB-ACCESS-TIMESTAMP".to_string(), timestamp);
206        headers.insert("Content-Type".to_string(), "application/json".to_string());
207
208        Ok(headers)
209    }
210
211    /// Sends an HTTP request to Coinbase International and parses the response into type `T`.
212    ///
213    /// Internally, this method handles:
214    /// - Building the URL from `base_url` + `path`.
215    /// - Optionally signing the request.
216    /// - Deserializing JSON responses into typed models, or returning a [`CoinbaseIntxHttpError`].
217    async fn send_request<T: DeserializeOwned, P: Serialize>(
218        &self,
219        method: Method,
220        path: &str,
221        params: Option<&P>,
222        body: Option<Vec<u8>>,
223        authenticate: bool,
224    ) -> Result<T, CoinbaseIntxHttpError> {
225        let params_str = params
226            .map(serde_urlencoded::to_string)
227            .transpose()
228            .map_err(|e| {
229                CoinbaseIntxHttpError::JsonError(format!("Failed to serialize params: {e}"))
230            })?;
231
232        let full_path = if let Some(ref query) = params_str {
233            if query.is_empty() {
234                path.to_string()
235            } else {
236                format!("{path}?{query}")
237            }
238        } else {
239            path.to_string()
240        };
241
242        let url = format!("{}{}", self.base_url, full_path);
243
244        let headers = if authenticate {
245            Some(self.sign_request(&method, &full_path, body.as_deref())?)
246        } else {
247            None
248        };
249
250        log::trace!("Request: {url:?} {body:?}");
251
252        let resp = self
253            .client
254            .request(method.clone(), url, None, headers, body, None, None)
255            .await?;
256
257        log::trace!("Response: {resp:?}");
258
259        if resp.status.is_success() {
260            let coinbase_response: T = serde_json::from_slice(&resp.body).map_err(|e| {
261                log::error!("Failed to deserialize CoinbaseResponse: {e}");
262                CoinbaseIntxHttpError::JsonError(e.to_string())
263            })?;
264
265            Ok(coinbase_response)
266        } else {
267            let error_body = String::from_utf8_lossy(&resp.body);
268            log::error!(
269                "HTTP error {} with body: {error_body}",
270                resp.status.as_str()
271            );
272
273            if let Ok(parsed_error) = serde_json::from_slice::<CoinbaseIntxResponse<T>>(&resp.body)
274            {
275                return Err(CoinbaseIntxHttpError::CoinbaseError {
276                    error_code: parsed_error.code,
277                    message: parsed_error.msg,
278                });
279            }
280
281            if let Ok(parsed_error) = serde_json::from_slice::<ErrorBody>(&resp.body)
282                && let (Some(title), Some(error)) = (parsed_error.title, parsed_error.error)
283            {
284                return Err(CoinbaseIntxHttpError::CoinbaseError {
285                    error_code: error,
286                    message: title,
287                });
288            }
289
290            Err(CoinbaseIntxHttpError::UnexpectedStatus {
291                status: StatusCode::from_u16(resp.status.as_u16()).unwrap(),
292                body: error_body.to_string(),
293            })
294        }
295    }
296
297    /// Requests a list of all supported assets.
298    ///
299    /// See <https://docs.cdp.coinbase.com/intx/reference/getassets>.
300    /// # Errors
301    ///
302    /// Returns an error if the HTTP request fails or the response cannot be parsed.
303    pub async fn http_list_assets(&self) -> Result<Vec<CoinbaseIntxAsset>, CoinbaseIntxHttpError> {
304        let path = "/api/v1/assets";
305        self.send_request::<_, ()>(Method::GET, path, None, None, false)
306            .await
307    }
308
309    /// Requests information for a specific asset.
310    ///
311    /// See <https://docs.cdp.coinbase.com/intx/reference/getasset>.
312    /// # Errors
313    ///
314    /// Returns an error if the HTTP request fails or the response cannot be parsed.
315    pub async fn http_get_asset_details(
316        &self,
317        asset: &str,
318    ) -> Result<CoinbaseIntxAsset, CoinbaseIntxHttpError> {
319        let path = format!("/api/v1/assets/{asset}");
320        self.send_request::<_, ()>(Method::GET, &path, None, None, false)
321            .await
322    }
323
324    /// Requests all instruments available for trading.
325    ///
326    /// See <https://docs.cdp.coinbase.com/intx/reference/getinstruments>.
327    /// # Errors
328    ///
329    /// Returns an error if the HTTP request fails or the response cannot be parsed.
330    pub async fn http_list_instruments(
331        &self,
332    ) -> Result<Vec<CoinbaseIntxInstrument>, CoinbaseIntxHttpError> {
333        let path = "/api/v1/instruments";
334        self.send_request::<_, ()>(Method::GET, path, None, None, false)
335            .await
336    }
337
338    /// Retrieve a list of instruments with open contracts.
339    ///
340    /// See <https://docs.cdp.coinbase.com/intx/reference/getinstrument>.
341    /// # Errors
342    ///
343    /// Returns an error if the HTTP request fails or the response cannot be parsed.
344    pub async fn http_get_instrument_details(
345        &self,
346        symbol: &str,
347    ) -> Result<CoinbaseIntxInstrument, CoinbaseIntxHttpError> {
348        let path = format!("/api/v1/instruments/{symbol}");
349        self.send_request::<_, ()>(Method::GET, &path, None, None, false)
350            .await
351    }
352
353    /// Return all the fee rate tiers.
354    ///
355    /// See <https://docs.cdp.coinbase.com/intx/reference/getassets>.
356    /// # Errors
357    ///
358    /// Returns an error if the HTTP request fails or the response cannot be parsed.
359    pub async fn http_list_fee_rate_tiers(
360        &self,
361    ) -> Result<Vec<CoinbaseIntxFeeTier>, CoinbaseIntxHttpError> {
362        let path = "/api/v1/fee-rate-tiers";
363        self.send_request::<_, ()>(Method::GET, path, None, None, true)
364            .await
365    }
366
367    /// List all user portfolios.
368    ///
369    /// See <https://docs.cdp.coinbase.com/intx/reference/getportfolios>.
370    /// # Errors
371    ///
372    /// Returns an error if the HTTP request fails or the response cannot be parsed.
373    pub async fn http_list_portfolios(
374        &self,
375    ) -> Result<Vec<CoinbaseIntxPortfolio>, CoinbaseIntxHttpError> {
376        let path = "/api/v1/portfolios";
377        self.send_request::<_, ()>(Method::GET, path, None, None, true)
378            .await
379    }
380
381    /// Returns the user's specified portfolio.
382    ///
383    /// See <https://docs.cdp.coinbase.com/intx/reference/getportfolio>.
384    /// # Errors
385    ///
386    /// Returns an error if the HTTP request fails or the response cannot be parsed.
387    pub async fn http_get_portfolio(
388        &self,
389        portfolio_id: &str,
390    ) -> Result<CoinbaseIntxPortfolio, CoinbaseIntxHttpError> {
391        let path = format!("/api/v1/portfolios/{portfolio_id}");
392        self.send_request::<_, ()>(Method::GET, &path, None, None, true)
393            .await
394    }
395
396    /// Retrieves the summary, positions, and balances of a portfolio.
397    ///
398    /// See <https://docs.cdp.coinbase.com/intx/reference/getportfoliodetail>.
399    /// # Errors
400    ///
401    /// Returns an error if the HTTP request fails or the response cannot be parsed.
402    pub async fn http_get_portfolio_details(
403        &self,
404        portfolio_id: &str,
405    ) -> Result<CoinbaseIntxPortfolioDetails, CoinbaseIntxHttpError> {
406        let path = format!("/api/v1/portfolios/{portfolio_id}/detail");
407        self.send_request::<_, ()>(Method::GET, &path, None, None, true)
408            .await
409    }
410
411    /// Retrieves the high level overview of a portfolio.
412    ///
413    /// See <https://docs.cdp.coinbase.com/intx/reference/getportfoliosummary>.
414    /// # Errors
415    ///
416    /// Returns an error if the HTTP request fails or the response cannot be parsed.
417    pub async fn http_get_portfolio_summary(
418        &self,
419        portfolio_id: &str,
420    ) -> Result<CoinbaseIntxPortfolioSummary, CoinbaseIntxHttpError> {
421        let path = format!("/api/v1/portfolios/{portfolio_id}/summary");
422        self.send_request::<_, ()>(Method::GET, &path, None, None, true)
423            .await
424    }
425
426    /// Returns all balances for a given portfolio.
427    ///
428    /// See <https://docs.cdp.coinbase.com/intx/reference/getportfoliobalances>.
429    /// # Errors
430    ///
431    /// Returns an error if the HTTP request fails or the response cannot be parsed.
432    pub async fn http_list_portfolio_balances(
433        &self,
434        portfolio_id: &str,
435    ) -> Result<Vec<CoinbaseIntxBalance>, CoinbaseIntxHttpError> {
436        let path = format!("/api/v1/portfolios/{portfolio_id}/balances");
437        self.send_request::<_, ()>(Method::GET, &path, None, None, true)
438            .await
439    }
440
441    /// Retrieves the balance for a given portfolio and asset.
442    ///
443    /// See <https://docs.cdp.coinbase.com/intx/reference/getportfoliobalance>.
444    /// # Errors
445    ///
446    /// Returns an error if the HTTP request fails or the response cannot be parsed.
447    pub async fn http_get_portfolio_balance(
448        &self,
449        portfolio_id: &str,
450        asset: &str,
451    ) -> Result<CoinbaseIntxBalance, CoinbaseIntxHttpError> {
452        let path = format!("/api/v1/portfolios/{portfolio_id}/balances/{asset}");
453        self.send_request::<_, ()>(Method::GET, &path, None, None, true)
454            .await
455    }
456
457    /// Returns all fills for a given portfolio.
458    ///
459    /// See <https://docs.cdp.coinbase.com/intx/reference/getportfoliofills>.
460    /// # Errors
461    ///
462    /// Returns an error if the HTTP request fails or the response cannot be parsed.
463    pub async fn http_list_portfolio_fills(
464        &self,
465        portfolio_id: &str,
466        params: GetPortfolioFillsParams,
467    ) -> Result<CoinbaseIntxFillList, CoinbaseIntxHttpError> {
468        let path = format!("/api/v1/portfolios/{portfolio_id}/fills");
469        self.send_request(Method::GET, &path, Some(&params), None, true)
470            .await
471    }
472
473    /// Returns all positions for a given portfolio.
474    ///
475    /// See <https://docs.cdp.coinbase.com/intx/reference/getportfoliopositions>.
476    /// # Errors
477    ///
478    /// Returns an error if the HTTP request fails or the response cannot be parsed.
479    pub async fn http_list_portfolio_positions(
480        &self,
481        portfolio_id: &str,
482    ) -> Result<Vec<CoinbaseIntxPosition>, CoinbaseIntxHttpError> {
483        let path = format!("/api/v1/portfolios/{portfolio_id}/positions");
484        self.send_request::<_, ()>(Method::GET, &path, None, None, true)
485            .await
486    }
487
488    /// Retrieves the position for a given portfolio and symbol.
489    ///
490    /// See <https://docs.cdp.coinbase.com/intx/reference/getportfolioposition>.
491    /// # Errors
492    ///
493    /// Returns an error if the HTTP request fails or the response cannot be parsed.
494    pub async fn http_get_portfolio_position(
495        &self,
496        portfolio_id: &str,
497        symbol: &str,
498    ) -> Result<CoinbaseIntxPosition, CoinbaseIntxHttpError> {
499        let path = format!("/api/v1/portfolios/{portfolio_id}/positions/{symbol}");
500        self.send_request::<_, ()>(Method::GET, &path, None, None, true)
501            .await
502    }
503
504    /// Retrieves the Perpetual Future and Spot fee rate tiers for the user.
505    ///
506    /// See <https://docs.cdp.coinbase.com/intx/reference/getportfoliosfeerates>.
507    /// # Errors
508    ///
509    /// Returns an error if the HTTP request fails or the response cannot be parsed.
510    pub async fn http_list_portfolio_fee_rates(
511        &self,
512    ) -> Result<Vec<CoinbaseIntxPortfolioFeeRates>, CoinbaseIntxHttpError> {
513        let path = "/api/v1/portfolios/fee-rates";
514        self.send_request::<_, ()>(Method::GET, path, None, None, true)
515            .await
516    }
517
518    /// Create a new order.
519    /// # Errors
520    ///
521    /// Returns an error if the HTTP request fails or the response cannot be parsed.
522    pub async fn http_create_order(
523        &self,
524        params: CreateOrderParams,
525    ) -> Result<CoinbaseIntxOrder, CoinbaseIntxHttpError> {
526        let path = "/api/v1/orders";
527        let body = serde_json::to_vec(&params)
528            .map_err(|e| CoinbaseIntxHttpError::JsonError(e.to_string()))?;
529        self.send_request::<_, ()>(Method::POST, path, None, Some(body), true)
530            .await
531    }
532
533    /// Retrieves a single order. The order retrieved can be either active or inactive.
534    ///
535    /// See <https://docs.cdp.coinbase.com/intx/reference/getorder>.
536    /// # Errors
537    ///
538    /// Returns an error if the HTTP request fails or the response cannot be parsed.
539    pub async fn http_get_order(
540        &self,
541        venue_order_id: &str,
542        portfolio_id: &str,
543    ) -> Result<CoinbaseIntxOrder, CoinbaseIntxHttpError> {
544        let params = GetOrderParams {
545            portfolio: portfolio_id.to_string(),
546        };
547        let path = format!("/api/v1/orders/{venue_order_id}");
548        self.send_request(Method::GET, &path, Some(&params), None, true)
549            .await
550    }
551
552    /// Returns a list of active orders resting on the order book matching the requested criteria.
553    /// Does not return any rejected, cancelled, or fully filled orders as they are not active.
554    ///
555    /// See <https://docs.cdp.coinbase.com/intx/reference/getorders>.
556    /// # Errors
557    ///
558    /// Returns an error if the HTTP request fails or the response cannot be parsed.
559    pub async fn http_list_open_orders(
560        &self,
561        params: GetOrdersParams,
562    ) -> Result<CoinbaseIntxOrderList, CoinbaseIntxHttpError> {
563        self.send_request(Method::GET, "/api/v1/orders", Some(&params), None, true)
564            .await
565    }
566
567    /// Cancels a single open order.
568    /// # Errors
569    ///
570    /// Returns an error if the HTTP request fails or the response cannot be parsed.
571    pub async fn http_cancel_order(
572        &self,
573        client_order_id: &str,
574        portfolio_id: &str,
575    ) -> Result<CoinbaseIntxOrder, CoinbaseIntxHttpError> {
576        let params = CancelOrderParams {
577            portfolio: portfolio_id.to_string(),
578        };
579        let path = format!("/api/v1/orders/{client_order_id}");
580        self.send_request(Method::DELETE, &path, Some(&params), None, true)
581            .await
582    }
583
584    /// Cancel user orders.
585    /// # Errors
586    ///
587    /// Returns an error if the HTTP request fails or the response cannot be parsed.
588    pub async fn http_cancel_orders(
589        &self,
590        params: CancelOrdersParams,
591    ) -> Result<Vec<CoinbaseIntxOrder>, CoinbaseIntxHttpError> {
592        self.send_request(Method::DELETE, "/api/v1/orders", Some(&params), None, true)
593            .await
594    }
595
596    /// Modify an open order.
597    ///
598    /// See <https://docs.cdp.coinbase.com/intx/reference/modifyorder>.
599    /// # Errors
600    ///
601    /// Returns an error if the HTTP request fails or the response cannot be parsed.
602    pub async fn http_modify_order(
603        &self,
604        order_id: &str,
605        params: ModifyOrderParams,
606    ) -> Result<CoinbaseIntxOrder, CoinbaseIntxHttpError> {
607        let path = format!("/api/v1/orders/{order_id}");
608        let body = serde_json::to_vec(&params)
609            .map_err(|e| CoinbaseIntxHttpError::JsonError(e.to_string()))?;
610        self.send_request::<_, ()>(Method::PUT, &path, None, Some(body), true)
611            .await
612    }
613}
614
615/// Provides a higher-level HTTP client for the [Coinbase International](https://coinbase.com) REST API.
616///
617/// This client wraps the underlying `CoinbaseIntxHttpInnerClient` to handle conversions
618/// into the Nautilus domain model.
619#[derive(Debug, Clone)]
620#[cfg_attr(
621    feature = "python",
622    pyo3::pyclass(
623        module = "nautilus_trader.core.nautilus_pyo3.coinbase_intx",
624        from_py_object
625    )
626)]
627pub struct CoinbaseIntxHttpClient {
628    pub(crate) inner: Arc<CoinbaseIntxHttpInnerClient>,
629    pub(crate) instruments_cache: Arc<Mutex<HashMap<Ustr, InstrumentAny>>>,
630    cache_initialized: bool,
631}
632
633impl Default for CoinbaseIntxHttpClient {
634    fn default() -> Self {
635        Self::new(None, Some(60)).expect("Failed to create default Coinbase INTX HTTP client")
636    }
637}
638
639impl CoinbaseIntxHttpClient {
640    /// Creates a new [`CoinbaseIntxHttpClient`] using the default Coinbase HTTP URL,
641    /// optionally overridden with a custom base url.
642    ///
643    /// This version of the client has **no credentials**, so it can only
644    /// call publicly accessible endpoints.
645    ///
646    /// # Errors
647    ///
648    /// Returns an error if the HTTP client cannot be created.
649    pub fn new(
650        base_url: Option<String>,
651        timeout_secs: Option<u64>,
652    ) -> Result<Self, HttpClientError> {
653        Ok(Self {
654            inner: Arc::new(CoinbaseIntxHttpInnerClient::new(base_url, timeout_secs)?),
655            instruments_cache: Arc::new(Mutex::new(HashMap::new())),
656            cache_initialized: false,
657        })
658    }
659
660    /// Creates a new authenticated [`CoinbaseIntxHttpClient`] using environment variables and
661    /// the default Coinbase International HTTP base url.
662    ///
663    /// # Errors
664    ///
665    /// Returns an error if required environment variables are missing or invalid.
666    pub fn from_env() -> anyhow::Result<Self> {
667        Self::with_credentials(None, None, None, None, None)
668    }
669
670    /// Creates a new [`CoinbaseIntxHttpClient`] configured with credentials
671    /// for authenticated requests, optionally using a custom base url.
672    ///
673    /// # Errors
674    ///
675    /// Returns an error if required environment variables are missing or invalid.
676    pub fn with_credentials(
677        api_key: Option<String>,
678        api_secret: Option<String>,
679        api_passphrase: Option<String>,
680        base_url: Option<String>,
681        timeout_secs: Option<u64>,
682    ) -> anyhow::Result<Self> {
683        let api_key = get_or_env_var(api_key, "COINBASE_INTX_API_KEY")?;
684        let api_secret = get_or_env_var(api_secret, "COINBASE_INTX_API_SECRET")?;
685        let api_passphrase = get_or_env_var(api_passphrase, "COINBASE_INTX_API_PASSPHRASE")?;
686        let base_url = base_url.unwrap_or(COINBASE_INTX_REST_URL.to_string());
687        Ok(Self {
688            inner: Arc::new(
689                CoinbaseIntxHttpInnerClient::with_credentials(
690                    api_key,
691                    api_secret,
692                    api_passphrase,
693                    base_url,
694                    timeout_secs,
695                )
696                .context("failed to create Coinbase INTX HTTP client")?,
697            ),
698            instruments_cache: Arc::new(Mutex::new(HashMap::new())),
699            cache_initialized: false,
700        })
701    }
702
703    fn get_instrument_from_cache(&self, symbol: Ustr) -> anyhow::Result<InstrumentAny> {
704        match self
705            .instruments_cache
706            .lock()
707            .expect(MUTEX_POISONED)
708            .get(&symbol)
709        {
710            Some(inst) => Ok(inst.clone()), // TODO: Remove this clone
711            None => anyhow::bail!("Unable to process request, instrument {symbol} not in cache"),
712        }
713    }
714
715    fn generate_ts_init(&self) -> UnixNanos {
716        get_atomic_clock_realtime().get_time_ns()
717    }
718
719    /// Returns the base url being used by the client.
720    #[must_use]
721    pub fn base_url(&self) -> &str {
722        self.inner.base_url.as_str()
723    }
724
725    /// Returns the public API key being used by the client.
726    #[must_use]
727    pub fn api_key(&self) -> Option<&str> {
728        self.inner.credential.as_ref().map(|c| c.api_key.as_str())
729    }
730
731    /// Returns a masked version of the API key for logging purposes.
732    #[must_use]
733    pub fn api_key_masked(&self) -> Option<String> {
734        self.inner.credential.as_ref().map(|c| c.api_key_masked())
735    }
736
737    /// Checks if the client is initialized.
738    ///
739    /// The client is considered initialized if any instruments have been cached from the venue.
740    #[must_use]
741    pub const fn is_initialized(&self) -> bool {
742        self.cache_initialized
743    }
744
745    /// Returns the cached instrument symbols.
746    ///
747    /// # Panics
748    ///
749    /// Panics if the instrument cache mutex is poisoned.
750    #[must_use]
751    pub fn get_cached_symbols(&self) -> Vec<String> {
752        self.instruments_cache
753            .lock()
754            .unwrap()
755            .keys()
756            .map(ToString::to_string)
757            .collect()
758    }
759
760    /// Adds the given instruments into the clients instrument cache.
761    ///
762    /// # Panics
763    ///
764    /// Panics if the instrument cache mutex is poisoned.
765    ///
766    /// Any existing instruments will be replaced.
767    pub fn add_instruments(&mut self, instruments: Vec<InstrumentAny>) {
768        for inst in instruments {
769            self.instruments_cache
770                .lock()
771                .unwrap()
772                .insert(inst.raw_symbol().inner(), inst);
773        }
774        self.cache_initialized = true;
775    }
776
777    /// Adds the given instrument into the clients instrument cache.
778    ///
779    /// # Panics
780    ///
781    /// Panics if the instrument cache mutex is poisoned.
782    ///
783    /// Any existing instrument will be replaced.
784    pub fn add_instrument(&mut self, instrument: InstrumentAny) {
785        self.instruments_cache
786            .lock()
787            .unwrap()
788            .insert(instrument.raw_symbol().inner(), instrument);
789        self.cache_initialized = true;
790    }
791
792    /// Requests a list of portfolio details from Coinbase International.
793    ///
794    /// # Errors
795    ///
796    /// Returns an error if the HTTP request fails or the response cannot be parsed.
797    pub async fn list_portfolios(&self) -> anyhow::Result<Vec<CoinbaseIntxPortfolio>> {
798        let resp = self
799            .inner
800            .http_list_portfolios()
801            .await
802            .map_err(|e| anyhow::anyhow!(e))?;
803
804        Ok(resp)
805    }
806
807    /// Requests the account state for the given account ID from Coinbase International.
808    ///
809    /// # Errors
810    ///
811    /// Returns an error if the HTTP request fails or the response cannot be parsed.
812    pub async fn request_account_state(
813        &self,
814        account_id: AccountId,
815    ) -> anyhow::Result<AccountState> {
816        let resp = self
817            .inner
818            .http_list_portfolio_balances(account_id.get_issuers_id())
819            .await
820            .map_err(|e| anyhow::anyhow!(e))?;
821
822        let ts_init = self.generate_ts_init();
823        let account_state = parse_account_state(resp, account_id, ts_init)?;
824
825        Ok(account_state)
826    }
827
828    /// Requests all instruments from Coinbase International.
829    ///
830    /// # Errors
831    ///
832    /// Returns an error if the HTTP request fails or the response cannot be parsed.
833    pub async fn request_instruments(&self) -> anyhow::Result<Vec<InstrumentAny>> {
834        let resp = self
835            .inner
836            .http_list_instruments()
837            .await
838            .map_err(|e| anyhow::anyhow!(e))?;
839
840        let ts_init = self.generate_ts_init();
841
842        let mut instruments: Vec<InstrumentAny> = Vec::new();
843        for inst in &resp {
844            let instrument_any = parse_instrument_any(inst, ts_init);
845            if let Some(instrument_any) = instrument_any {
846                instruments.push(instrument_any);
847            }
848        }
849
850        Ok(instruments)
851    }
852
853    /// Requests the instrument for the given symbol from Coinbase International.
854    ///
855    /// # Errors
856    ///
857    /// Returns an error if the HTTP request fails or the instrument cannot be parsed.
858    pub async fn request_instrument(&self, symbol: &Symbol) -> anyhow::Result<InstrumentAny> {
859        let resp = self
860            .inner
861            .http_get_instrument_details(symbol.as_str())
862            .await
863            .map_err(|e| anyhow::anyhow!(e))?;
864
865        let ts_init = self.generate_ts_init();
866
867        match parse_instrument_any(&resp, ts_init) {
868            Some(inst) => Ok(inst),
869            None => anyhow::bail!("Unable to parse instrument"),
870        }
871    }
872
873    /// Requests an order status report for the given venue order ID from Coinbase International.
874    ///
875    /// # Errors
876    ///
877    /// Returns an error if the HTTP request fails or the response cannot be parsed.
878    pub async fn request_order_status_report(
879        &self,
880        account_id: AccountId,
881        venue_order_id: VenueOrderId,
882    ) -> anyhow::Result<OrderStatusReport> {
883        let portfolio_id = account_id.get_issuers_id();
884
885        let resp = self
886            .inner
887            .http_get_order(venue_order_id.as_str(), portfolio_id)
888            .await
889            .map_err(|e| anyhow::anyhow!(e))?;
890
891        let instrument = self.get_instrument_from_cache(resp.symbol)?;
892        let ts_init = self.generate_ts_init();
893
894        let report = parse_order_status_report(
895            resp,
896            account_id,
897            instrument.price_precision(),
898            instrument.size_precision(),
899            ts_init,
900        )?;
901        Ok(report)
902    }
903
904    /// Requests order status reports for all **open** orders from Coinbase International.
905    ///
906    /// # Errors
907    ///
908    /// Returns an error if the HTTP request fails or the response cannot be parsed.
909    pub async fn request_order_status_reports(
910        &self,
911        account_id: AccountId,
912        symbol: Symbol,
913    ) -> anyhow::Result<Vec<OrderStatusReport>> {
914        let portfolio_id = account_id.get_issuers_id();
915
916        let mut params = GetOrdersParamsBuilder::default();
917        params.portfolio(portfolio_id);
918        params.instrument(symbol.as_str());
919        let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
920
921        let resp = self
922            .inner
923            .http_list_open_orders(params)
924            .await
925            .map_err(|e| anyhow::anyhow!(e))?;
926
927        let ts_init = get_atomic_clock_realtime().get_time_ns();
928
929        let mut reports: Vec<OrderStatusReport> = Vec::new();
930        for order in resp.results {
931            let instrument = self.get_instrument_from_cache(order.symbol)?;
932            let report = parse_order_status_report(
933                order,
934                account_id,
935                instrument.price_precision(),
936                instrument.size_precision(),
937                ts_init,
938            )?;
939            reports.push(report);
940        }
941
942        Ok(reports)
943    }
944
945    /// Requests all fill reports from Coinbase International.
946    ///
947    /// # Errors
948    ///
949    /// Returns an error if the HTTP request fails or the response cannot be parsed.
950    pub async fn request_fill_reports(
951        &self,
952        account_id: AccountId,
953        client_order_id: Option<ClientOrderId>,
954        start: Option<DateTime<Utc>>,
955    ) -> anyhow::Result<Vec<FillReport>> {
956        let portfolio_id = account_id.get_issuers_id();
957
958        let mut params = GetPortfolioFillsParamsBuilder::default();
959        if let Some(start) = start {
960            params.time_from(start);
961        }
962        if let Some(client_order_id) = client_order_id {
963            params.client_order_id(client_order_id.to_string());
964        }
965        let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
966
967        let resp = self
968            .inner
969            .http_list_portfolio_fills(portfolio_id, params)
970            .await
971            .map_err(|e| anyhow::anyhow!(e))?;
972
973        let ts_init = get_atomic_clock_realtime().get_time_ns();
974
975        let mut reports: Vec<FillReport> = Vec::new();
976        for fill in resp.results {
977            let instrument = self.get_instrument_from_cache(fill.symbol)?;
978            let report = parse_fill_report(
979                fill,
980                account_id,
981                instrument.price_precision(),
982                instrument.size_precision(),
983                ts_init,
984            )?;
985            reports.push(report);
986        }
987
988        Ok(reports)
989    }
990
991    /// Requests a position status report from Coinbase International.
992    ///
993    /// # Errors
994    ///
995    /// Returns an error if the HTTP request fails or the response cannot be parsed.
996    pub async fn request_position_status_report(
997        &self,
998        account_id: AccountId,
999        symbol: Symbol,
1000    ) -> anyhow::Result<PositionStatusReport> {
1001        let portfolio_id = account_id.get_issuers_id();
1002
1003        let resp = self
1004            .inner
1005            .http_get_portfolio_position(portfolio_id, symbol.as_str())
1006            .await
1007            .map_err(|e| anyhow::anyhow!(e))?;
1008
1009        let instrument = self.get_instrument_from_cache(resp.symbol)?;
1010        let ts_init = get_atomic_clock_realtime().get_time_ns();
1011
1012        let report =
1013            parse_position_status_report(resp, account_id, instrument.size_precision(), ts_init)?;
1014        Ok(report)
1015    }
1016
1017    /// Requests all position status reports from Coinbase International.
1018    ///
1019    /// # Errors
1020    ///
1021    /// Returns an error if the HTTP request fails or the response cannot be parsed.
1022    pub async fn request_position_status_reports(
1023        &self,
1024        account_id: AccountId,
1025    ) -> anyhow::Result<Vec<PositionStatusReport>> {
1026        let portfolio_id = account_id.get_issuers_id();
1027
1028        let resp = self
1029            .inner
1030            .http_list_portfolio_positions(portfolio_id)
1031            .await
1032            .map_err(|e| anyhow::anyhow!(e))?;
1033
1034        let ts_init = get_atomic_clock_realtime().get_time_ns();
1035
1036        let mut reports: Vec<PositionStatusReport> = Vec::new();
1037        for position in resp {
1038            let instrument = self.get_instrument_from_cache(position.symbol)?;
1039            let report = parse_position_status_report(
1040                position,
1041                account_id,
1042                instrument.size_precision(),
1043                ts_init,
1044            )?;
1045            reports.push(report);
1046        }
1047
1048        Ok(reports)
1049    }
1050
1051    /// Submits a new order to Coinbase International.
1052    ///
1053    /// # Errors
1054    ///
1055    /// Returns an error if the HTTP request fails or the response cannot be parsed.
1056    #[allow(clippy::too_many_arguments)]
1057    pub async fn submit_order(
1058        &self,
1059        account_id: AccountId,
1060        client_order_id: ClientOrderId,
1061        symbol: Symbol,
1062        order_side: OrderSide,
1063        order_type: OrderType,
1064        quantity: Quantity,
1065        time_in_force: TimeInForce,
1066        expire_time: Option<DateTime<Utc>>,
1067        price: Option<Price>,
1068        trigger_price: Option<Price>,
1069        post_only: Option<bool>,
1070        reduce_only: Option<bool>,
1071    ) -> anyhow::Result<OrderStatusReport> {
1072        let coinbase_side: CoinbaseIntxSide = order_side.into();
1073        let coinbase_order_type: CoinbaseIntxOrderType = order_type.into();
1074        let coinbase_tif: CoinbaseIntxTimeInForce = time_in_force.into();
1075
1076        let mut params = CreateOrderParamsBuilder::default();
1077        params.portfolio(account_id.get_issuers_id());
1078        params.client_order_id(client_order_id.as_str());
1079        params.instrument(symbol.as_str());
1080        params.side(coinbase_side);
1081        params.size(quantity.to_string());
1082        params.order_type(coinbase_order_type);
1083        params.tif(coinbase_tif);
1084        if let Some(expire_time) = expire_time {
1085            params.expire_time(expire_time);
1086        }
1087        if let Some(price) = price {
1088            params.price(price.to_string());
1089        }
1090        if let Some(trigger_price) = trigger_price {
1091            params.stop_price(trigger_price.to_string());
1092        }
1093        if let Some(post_only) = post_only {
1094            params.post_only(post_only);
1095        }
1096        if let Some(reduce_only) = reduce_only {
1097            params.close_only(reduce_only);
1098        }
1099        let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1100
1101        let resp = self.inner.http_create_order(params).await?;
1102        log::debug!("Submitted order: {resp:?}");
1103
1104        let instrument = self.get_instrument_from_cache(resp.symbol)?;
1105        let ts_init = get_atomic_clock_realtime().get_time_ns();
1106        let report = parse_order_status_report(
1107            resp,
1108            account_id,
1109            instrument.price_precision(),
1110            instrument.size_precision(),
1111            ts_init,
1112        )?;
1113        Ok(report)
1114    }
1115
1116    /// Cancels a currently open order on Coinbase International.
1117    ///
1118    /// # Errors
1119    ///
1120    /// Returns an error if the HTTP request fails or the response cannot be parsed.
1121    pub async fn cancel_order(
1122        &self,
1123        account_id: AccountId,
1124        client_order_id: ClientOrderId,
1125    ) -> anyhow::Result<OrderStatusReport> {
1126        let portfolio_id = account_id.get_issuers_id();
1127
1128        let resp = self
1129            .inner
1130            .http_cancel_order(client_order_id.as_str(), portfolio_id)
1131            .await?;
1132        log::debug!("Canceled order: {resp:?}");
1133
1134        let instrument = self.get_instrument_from_cache(resp.symbol)?;
1135        let ts_init = get_atomic_clock_realtime().get_time_ns();
1136
1137        let report = parse_order_status_report(
1138            resp,
1139            account_id,
1140            instrument.price_precision(),
1141            instrument.size_precision(),
1142            ts_init,
1143        )?;
1144        Ok(report)
1145    }
1146
1147    /// Cancels all orders for the given account ID and filter params on Coinbase International.
1148    ///
1149    /// # Errors
1150    ///
1151    /// Returns an error if the HTTP request fails or the response cannot be parsed.
1152    pub async fn cancel_orders(
1153        &self,
1154        account_id: AccountId,
1155        symbol: Symbol,
1156        order_side: Option<OrderSide>,
1157    ) -> anyhow::Result<Vec<OrderStatusReport>> {
1158        let mut params = CancelOrdersParamsBuilder::default();
1159        params.portfolio(account_id.get_issuers_id());
1160        params.instrument(symbol.as_str());
1161        if let Some(side) = order_side {
1162            let side: CoinbaseIntxSide = side.into();
1163            params.side(side);
1164        }
1165        let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1166
1167        let resp = self.inner.http_cancel_orders(params).await?;
1168
1169        let instrument = self.get_instrument_from_cache(symbol.inner())?;
1170        let ts_init = get_atomic_clock_realtime().get_time_ns();
1171
1172        let mut reports: Vec<OrderStatusReport> = Vec::with_capacity(resp.len());
1173        for order in resp {
1174            log::debug!("Canceled order: {order:?}");
1175            let report = parse_order_status_report(
1176                order,
1177                account_id,
1178                instrument.price_precision(),
1179                instrument.size_precision(),
1180                ts_init,
1181            )?;
1182            reports.push(report);
1183        }
1184
1185        Ok(reports)
1186    }
1187
1188    /// Modifies a currently open order on Coinbase International.
1189    ///
1190    /// # Errors
1191    ///
1192    /// Returns an error if the HTTP request fails or the response cannot be parsed.
1193    #[allow(clippy::too_many_arguments)]
1194    pub async fn modify_order(
1195        &self,
1196        account_id: AccountId,
1197        client_order_id: ClientOrderId,
1198        new_client_order_id: ClientOrderId,
1199        price: Option<Price>,
1200        trigger_price: Option<Price>,
1201        quantity: Option<Quantity>,
1202    ) -> anyhow::Result<OrderStatusReport> {
1203        let mut params = ModifyOrderParamsBuilder::default();
1204        params.portfolio(account_id.get_issuers_id());
1205        params.client_order_id(new_client_order_id.as_str());
1206        if let Some(price) = price {
1207            params.price(price.to_string());
1208        }
1209        if let Some(trigger_price) = trigger_price {
1210            params.price(trigger_price.to_string());
1211        }
1212        if let Some(quantity) = quantity {
1213            params.size(quantity.to_string());
1214        }
1215        let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1216
1217        let resp = self
1218            .inner
1219            .http_modify_order(client_order_id.as_str(), params)
1220            .await?;
1221        log::debug!("Modified order {}", resp.client_order_id);
1222
1223        let instrument = self.get_instrument_from_cache(resp.symbol)?;
1224        let ts_init = get_atomic_clock_realtime().get_time_ns();
1225        let report = parse_order_status_report(
1226            resp,
1227            account_id,
1228            instrument.price_precision(),
1229            instrument.size_precision(),
1230            ts_init,
1231        )?;
1232        Ok(report)
1233    }
1234}