1use std::{
24 collections::HashMap,
25 num::NonZeroU32,
26 sync::{Arc, LazyLock, Mutex},
27};
28
29use anyhow::Context;
30use chrono::{DateTime, Utc};
31use nautilus_core::{
32 MUTEX_POISONED, UnixNanos, consts::NAUTILUS_USER_AGENT, env::get_or_env_var,
33 time::get_atomic_clock_realtime,
34};
35use nautilus_model::{
36 enums::{OrderSide, OrderType, TimeInForce},
37 events::AccountState,
38 identifiers::{AccountId, ClientOrderId, Symbol, VenueOrderId},
39 instruments::{Instrument, InstrumentAny},
40 reports::{FillReport, OrderStatusReport, PositionStatusReport},
41 types::{Price, Quantity},
42};
43use nautilus_network::{
44 http::{HttpClient, HttpClientError, Method, StatusCode, USER_AGENT},
45 ratelimiter::quota::Quota,
46};
47use serde::{Deserialize, Serialize, de::DeserializeOwned};
48use ustr::Ustr;
49
50use super::{
51 error::CoinbaseIntxHttpError,
52 models::{
53 CoinbaseIntxAsset, CoinbaseIntxBalance, CoinbaseIntxFeeTier, CoinbaseIntxFillList,
54 CoinbaseIntxInstrument, CoinbaseIntxOrder, CoinbaseIntxOrderList, CoinbaseIntxPortfolio,
55 CoinbaseIntxPortfolioDetails, CoinbaseIntxPortfolioFeeRates, CoinbaseIntxPortfolioSummary,
56 CoinbaseIntxPosition,
57 },
58 parse::{
59 parse_account_state, parse_fill_report, parse_instrument_any, parse_order_status_report,
60 parse_position_status_report,
61 },
62 query::{
63 CancelOrderParams, CancelOrdersParams, CreateOrderParams, CreateOrderParamsBuilder,
64 GetOrderParams, GetOrdersParams, GetOrdersParamsBuilder, GetPortfolioFillsParams,
65 GetPortfolioFillsParamsBuilder, ModifyOrderParams,
66 },
67};
68use crate::{
69 common::{
70 consts::COINBASE_INTX_REST_URL,
71 credential::Credential,
72 enums::{CoinbaseIntxOrderType, CoinbaseIntxSide, CoinbaseIntxTimeInForce},
73 },
74 http::{
75 error::ErrorBody,
76 query::{CancelOrdersParamsBuilder, ModifyOrderParamsBuilder},
77 },
78};
79
80#[derive(Debug, Serialize, Deserialize)]
82pub struct CoinbaseIntxResponse<T> {
83 pub code: String,
85 pub msg: String,
87 pub data: Vec<T>,
89}
90
91pub static COINBASE_INTX_REST_QUOTA: LazyLock<Quota> = LazyLock::new(|| {
93 Quota::per_second(NonZeroU32::new(100).expect("non-zero")).expect("valid constant")
94});
95
96#[derive(Debug, Clone)]
102pub struct CoinbaseIntxHttpInnerClient {
103 base_url: String,
104 client: HttpClient,
105 credential: Option<Credential>,
106}
107
108impl Default for CoinbaseIntxHttpInnerClient {
109 fn default() -> Self {
110 Self::new(None, Some(60)).expect("Failed to create default Coinbase INTX HTTP client")
111 }
112}
113
114impl CoinbaseIntxHttpInnerClient {
115 pub fn new(
125 base_url: Option<String>,
126 timeout_secs: Option<u64>,
127 ) -> Result<Self, HttpClientError> {
128 Ok(Self {
129 base_url: base_url.unwrap_or(COINBASE_INTX_REST_URL.to_string()),
130 client: HttpClient::new(
131 Self::default_headers(),
132 vec![],
133 vec![],
134 Some(*COINBASE_INTX_REST_QUOTA),
135 timeout_secs,
136 None, )?,
138 credential: None,
139 })
140 }
141
142 pub fn with_credentials(
149 api_key: String,
150 api_secret: String,
151 api_passphrase: String,
152 base_url: String,
153 timeout_secs: Option<u64>,
154 ) -> Result<Self, HttpClientError> {
155 Ok(Self {
156 base_url,
157 client: HttpClient::new(
158 Self::default_headers(),
159 vec![],
160 vec![],
161 Some(*COINBASE_INTX_REST_QUOTA),
162 timeout_secs,
163 None, )?,
165 credential: Some(Credential::new(api_key, api_secret, api_passphrase)),
166 })
167 }
168
169 fn default_headers() -> HashMap<String, String> {
171 HashMap::from([(USER_AGENT.to_string(), NAUTILUS_USER_AGENT.to_string())])
172 }
173
174 fn sign_request(
181 &self,
182 method: &Method,
183 path: &str,
184 body: Option<&[u8]>,
185 ) -> Result<HashMap<String, String>, CoinbaseIntxHttpError> {
186 let credential = match self.credential.as_ref() {
187 Some(c) => c,
188 None => return Err(CoinbaseIntxHttpError::MissingCredentials),
189 };
190
191 let api_key = credential.api_key.clone().to_string();
192 let api_passphrase = credential.api_passphrase.clone().to_string();
193 let timestamp = Utc::now().timestamp().to_string();
194 let body_str = body
195 .and_then(|b| String::from_utf8(b.to_vec()).ok())
196 .unwrap_or_default();
197
198 let signature = credential.sign(×tamp, method.as_str(), path, &body_str);
199
200 let mut headers = HashMap::new();
201 headers.insert("Accept".to_string(), "application/json".to_string());
202 headers.insert("CB-ACCESS-KEY".to_string(), api_key);
203 headers.insert("CB-ACCESS-PASSPHRASE".to_string(), api_passphrase);
204 headers.insert("CB-ACCESS-SIGN".to_string(), signature);
205 headers.insert("CB-ACCESS-TIMESTAMP".to_string(), timestamp);
206 headers.insert("Content-Type".to_string(), "application/json".to_string());
207
208 Ok(headers)
209 }
210
211 async fn send_request<T: DeserializeOwned, P: Serialize>(
218 &self,
219 method: Method,
220 path: &str,
221 params: Option<&P>,
222 body: Option<Vec<u8>>,
223 authenticate: bool,
224 ) -> Result<T, CoinbaseIntxHttpError> {
225 let params_str = params
226 .map(serde_urlencoded::to_string)
227 .transpose()
228 .map_err(|e| {
229 CoinbaseIntxHttpError::JsonError(format!("Failed to serialize params: {e}"))
230 })?;
231
232 let full_path = if let Some(ref query) = params_str {
233 if query.is_empty() {
234 path.to_string()
235 } else {
236 format!("{path}?{query}")
237 }
238 } else {
239 path.to_string()
240 };
241
242 let url = format!("{}{}", self.base_url, full_path);
243
244 let headers = if authenticate {
245 Some(self.sign_request(&method, &full_path, body.as_deref())?)
246 } else {
247 None
248 };
249
250 log::trace!("Request: {url:?} {body:?}");
251
252 let resp = self
253 .client
254 .request(method.clone(), url, None, headers, body, None, None)
255 .await?;
256
257 log::trace!("Response: {resp:?}");
258
259 if resp.status.is_success() {
260 let coinbase_response: T = serde_json::from_slice(&resp.body).map_err(|e| {
261 log::error!("Failed to deserialize CoinbaseResponse: {e}");
262 CoinbaseIntxHttpError::JsonError(e.to_string())
263 })?;
264
265 Ok(coinbase_response)
266 } else {
267 let error_body = String::from_utf8_lossy(&resp.body);
268 log::error!(
269 "HTTP error {} with body: {error_body}",
270 resp.status.as_str()
271 );
272
273 if let Ok(parsed_error) = serde_json::from_slice::<CoinbaseIntxResponse<T>>(&resp.body)
274 {
275 return Err(CoinbaseIntxHttpError::CoinbaseError {
276 error_code: parsed_error.code,
277 message: parsed_error.msg,
278 });
279 }
280
281 if let Ok(parsed_error) = serde_json::from_slice::<ErrorBody>(&resp.body)
282 && let (Some(title), Some(error)) = (parsed_error.title, parsed_error.error)
283 {
284 return Err(CoinbaseIntxHttpError::CoinbaseError {
285 error_code: error,
286 message: title,
287 });
288 }
289
290 Err(CoinbaseIntxHttpError::UnexpectedStatus {
291 status: StatusCode::from_u16(resp.status.as_u16()).unwrap(),
292 body: error_body.to_string(),
293 })
294 }
295 }
296
297 pub async fn http_list_assets(&self) -> Result<Vec<CoinbaseIntxAsset>, CoinbaseIntxHttpError> {
304 let path = "/api/v1/assets";
305 self.send_request::<_, ()>(Method::GET, path, None, None, false)
306 .await
307 }
308
309 pub async fn http_get_asset_details(
316 &self,
317 asset: &str,
318 ) -> Result<CoinbaseIntxAsset, CoinbaseIntxHttpError> {
319 let path = format!("/api/v1/assets/{asset}");
320 self.send_request::<_, ()>(Method::GET, &path, None, None, false)
321 .await
322 }
323
324 pub async fn http_list_instruments(
331 &self,
332 ) -> Result<Vec<CoinbaseIntxInstrument>, CoinbaseIntxHttpError> {
333 let path = "/api/v1/instruments";
334 self.send_request::<_, ()>(Method::GET, path, None, None, false)
335 .await
336 }
337
338 pub async fn http_get_instrument_details(
345 &self,
346 symbol: &str,
347 ) -> Result<CoinbaseIntxInstrument, CoinbaseIntxHttpError> {
348 let path = format!("/api/v1/instruments/{symbol}");
349 self.send_request::<_, ()>(Method::GET, &path, None, None, false)
350 .await
351 }
352
353 pub async fn http_list_fee_rate_tiers(
360 &self,
361 ) -> Result<Vec<CoinbaseIntxFeeTier>, CoinbaseIntxHttpError> {
362 let path = "/api/v1/fee-rate-tiers";
363 self.send_request::<_, ()>(Method::GET, path, None, None, true)
364 .await
365 }
366
367 pub async fn http_list_portfolios(
374 &self,
375 ) -> Result<Vec<CoinbaseIntxPortfolio>, CoinbaseIntxHttpError> {
376 let path = "/api/v1/portfolios";
377 self.send_request::<_, ()>(Method::GET, path, None, None, true)
378 .await
379 }
380
381 pub async fn http_get_portfolio(
388 &self,
389 portfolio_id: &str,
390 ) -> Result<CoinbaseIntxPortfolio, CoinbaseIntxHttpError> {
391 let path = format!("/api/v1/portfolios/{portfolio_id}");
392 self.send_request::<_, ()>(Method::GET, &path, None, None, true)
393 .await
394 }
395
396 pub async fn http_get_portfolio_details(
403 &self,
404 portfolio_id: &str,
405 ) -> Result<CoinbaseIntxPortfolioDetails, CoinbaseIntxHttpError> {
406 let path = format!("/api/v1/portfolios/{portfolio_id}/detail");
407 self.send_request::<_, ()>(Method::GET, &path, None, None, true)
408 .await
409 }
410
411 pub async fn http_get_portfolio_summary(
418 &self,
419 portfolio_id: &str,
420 ) -> Result<CoinbaseIntxPortfolioSummary, CoinbaseIntxHttpError> {
421 let path = format!("/api/v1/portfolios/{portfolio_id}/summary");
422 self.send_request::<_, ()>(Method::GET, &path, None, None, true)
423 .await
424 }
425
426 pub async fn http_list_portfolio_balances(
433 &self,
434 portfolio_id: &str,
435 ) -> Result<Vec<CoinbaseIntxBalance>, CoinbaseIntxHttpError> {
436 let path = format!("/api/v1/portfolios/{portfolio_id}/balances");
437 self.send_request::<_, ()>(Method::GET, &path, None, None, true)
438 .await
439 }
440
441 pub async fn http_get_portfolio_balance(
448 &self,
449 portfolio_id: &str,
450 asset: &str,
451 ) -> Result<CoinbaseIntxBalance, CoinbaseIntxHttpError> {
452 let path = format!("/api/v1/portfolios/{portfolio_id}/balances/{asset}");
453 self.send_request::<_, ()>(Method::GET, &path, None, None, true)
454 .await
455 }
456
457 pub async fn http_list_portfolio_fills(
464 &self,
465 portfolio_id: &str,
466 params: GetPortfolioFillsParams,
467 ) -> Result<CoinbaseIntxFillList, CoinbaseIntxHttpError> {
468 let path = format!("/api/v1/portfolios/{portfolio_id}/fills");
469 self.send_request(Method::GET, &path, Some(¶ms), None, true)
470 .await
471 }
472
473 pub async fn http_list_portfolio_positions(
480 &self,
481 portfolio_id: &str,
482 ) -> Result<Vec<CoinbaseIntxPosition>, CoinbaseIntxHttpError> {
483 let path = format!("/api/v1/portfolios/{portfolio_id}/positions");
484 self.send_request::<_, ()>(Method::GET, &path, None, None, true)
485 .await
486 }
487
488 pub async fn http_get_portfolio_position(
495 &self,
496 portfolio_id: &str,
497 symbol: &str,
498 ) -> Result<CoinbaseIntxPosition, CoinbaseIntxHttpError> {
499 let path = format!("/api/v1/portfolios/{portfolio_id}/positions/{symbol}");
500 self.send_request::<_, ()>(Method::GET, &path, None, None, true)
501 .await
502 }
503
504 pub async fn http_list_portfolio_fee_rates(
511 &self,
512 ) -> Result<Vec<CoinbaseIntxPortfolioFeeRates>, CoinbaseIntxHttpError> {
513 let path = "/api/v1/portfolios/fee-rates";
514 self.send_request::<_, ()>(Method::GET, path, None, None, true)
515 .await
516 }
517
518 pub async fn http_create_order(
523 &self,
524 params: CreateOrderParams,
525 ) -> Result<CoinbaseIntxOrder, CoinbaseIntxHttpError> {
526 let path = "/api/v1/orders";
527 let body = serde_json::to_vec(¶ms)
528 .map_err(|e| CoinbaseIntxHttpError::JsonError(e.to_string()))?;
529 self.send_request::<_, ()>(Method::POST, path, None, Some(body), true)
530 .await
531 }
532
533 pub async fn http_get_order(
540 &self,
541 venue_order_id: &str,
542 portfolio_id: &str,
543 ) -> Result<CoinbaseIntxOrder, CoinbaseIntxHttpError> {
544 let params = GetOrderParams {
545 portfolio: portfolio_id.to_string(),
546 };
547 let path = format!("/api/v1/orders/{venue_order_id}");
548 self.send_request(Method::GET, &path, Some(¶ms), None, true)
549 .await
550 }
551
552 pub async fn http_list_open_orders(
560 &self,
561 params: GetOrdersParams,
562 ) -> Result<CoinbaseIntxOrderList, CoinbaseIntxHttpError> {
563 self.send_request(Method::GET, "/api/v1/orders", Some(¶ms), None, true)
564 .await
565 }
566
567 pub async fn http_cancel_order(
572 &self,
573 client_order_id: &str,
574 portfolio_id: &str,
575 ) -> Result<CoinbaseIntxOrder, CoinbaseIntxHttpError> {
576 let params = CancelOrderParams {
577 portfolio: portfolio_id.to_string(),
578 };
579 let path = format!("/api/v1/orders/{client_order_id}");
580 self.send_request(Method::DELETE, &path, Some(¶ms), None, true)
581 .await
582 }
583
584 pub async fn http_cancel_orders(
589 &self,
590 params: CancelOrdersParams,
591 ) -> Result<Vec<CoinbaseIntxOrder>, CoinbaseIntxHttpError> {
592 self.send_request(Method::DELETE, "/api/v1/orders", Some(¶ms), None, true)
593 .await
594 }
595
596 pub async fn http_modify_order(
603 &self,
604 order_id: &str,
605 params: ModifyOrderParams,
606 ) -> Result<CoinbaseIntxOrder, CoinbaseIntxHttpError> {
607 let path = format!("/api/v1/orders/{order_id}");
608 let body = serde_json::to_vec(¶ms)
609 .map_err(|e| CoinbaseIntxHttpError::JsonError(e.to_string()))?;
610 self.send_request::<_, ()>(Method::PUT, &path, None, Some(body), true)
611 .await
612 }
613}
614
615#[derive(Debug, Clone)]
620#[cfg_attr(
621 feature = "python",
622 pyo3::pyclass(
623 module = "nautilus_trader.core.nautilus_pyo3.coinbase_intx",
624 from_py_object
625 )
626)]
627pub struct CoinbaseIntxHttpClient {
628 pub(crate) inner: Arc<CoinbaseIntxHttpInnerClient>,
629 pub(crate) instruments_cache: Arc<Mutex<HashMap<Ustr, InstrumentAny>>>,
630 cache_initialized: bool,
631}
632
633impl Default for CoinbaseIntxHttpClient {
634 fn default() -> Self {
635 Self::new(None, Some(60)).expect("Failed to create default Coinbase INTX HTTP client")
636 }
637}
638
639impl CoinbaseIntxHttpClient {
640 pub fn new(
650 base_url: Option<String>,
651 timeout_secs: Option<u64>,
652 ) -> Result<Self, HttpClientError> {
653 Ok(Self {
654 inner: Arc::new(CoinbaseIntxHttpInnerClient::new(base_url, timeout_secs)?),
655 instruments_cache: Arc::new(Mutex::new(HashMap::new())),
656 cache_initialized: false,
657 })
658 }
659
660 pub fn from_env() -> anyhow::Result<Self> {
667 Self::with_credentials(None, None, None, None, None)
668 }
669
670 pub fn with_credentials(
677 api_key: Option<String>,
678 api_secret: Option<String>,
679 api_passphrase: Option<String>,
680 base_url: Option<String>,
681 timeout_secs: Option<u64>,
682 ) -> anyhow::Result<Self> {
683 let api_key = get_or_env_var(api_key, "COINBASE_INTX_API_KEY")?;
684 let api_secret = get_or_env_var(api_secret, "COINBASE_INTX_API_SECRET")?;
685 let api_passphrase = get_or_env_var(api_passphrase, "COINBASE_INTX_API_PASSPHRASE")?;
686 let base_url = base_url.unwrap_or(COINBASE_INTX_REST_URL.to_string());
687 Ok(Self {
688 inner: Arc::new(
689 CoinbaseIntxHttpInnerClient::with_credentials(
690 api_key,
691 api_secret,
692 api_passphrase,
693 base_url,
694 timeout_secs,
695 )
696 .context("failed to create Coinbase INTX HTTP client")?,
697 ),
698 instruments_cache: Arc::new(Mutex::new(HashMap::new())),
699 cache_initialized: false,
700 })
701 }
702
703 fn get_instrument_from_cache(&self, symbol: Ustr) -> anyhow::Result<InstrumentAny> {
704 match self
705 .instruments_cache
706 .lock()
707 .expect(MUTEX_POISONED)
708 .get(&symbol)
709 {
710 Some(inst) => Ok(inst.clone()), None => anyhow::bail!("Unable to process request, instrument {symbol} not in cache"),
712 }
713 }
714
715 fn generate_ts_init(&self) -> UnixNanos {
716 get_atomic_clock_realtime().get_time_ns()
717 }
718
719 #[must_use]
721 pub fn base_url(&self) -> &str {
722 self.inner.base_url.as_str()
723 }
724
725 #[must_use]
727 pub fn api_key(&self) -> Option<&str> {
728 self.inner.credential.as_ref().map(|c| c.api_key.as_str())
729 }
730
731 #[must_use]
733 pub fn api_key_masked(&self) -> Option<String> {
734 self.inner.credential.as_ref().map(|c| c.api_key_masked())
735 }
736
737 #[must_use]
741 pub const fn is_initialized(&self) -> bool {
742 self.cache_initialized
743 }
744
745 #[must_use]
751 pub fn get_cached_symbols(&self) -> Vec<String> {
752 self.instruments_cache
753 .lock()
754 .unwrap()
755 .keys()
756 .map(ToString::to_string)
757 .collect()
758 }
759
760 pub fn add_instruments(&mut self, instruments: Vec<InstrumentAny>) {
768 for inst in instruments {
769 self.instruments_cache
770 .lock()
771 .unwrap()
772 .insert(inst.raw_symbol().inner(), inst);
773 }
774 self.cache_initialized = true;
775 }
776
777 pub fn add_instrument(&mut self, instrument: InstrumentAny) {
785 self.instruments_cache
786 .lock()
787 .unwrap()
788 .insert(instrument.raw_symbol().inner(), instrument);
789 self.cache_initialized = true;
790 }
791
792 pub async fn list_portfolios(&self) -> anyhow::Result<Vec<CoinbaseIntxPortfolio>> {
798 let resp = self
799 .inner
800 .http_list_portfolios()
801 .await
802 .map_err(|e| anyhow::anyhow!(e))?;
803
804 Ok(resp)
805 }
806
807 pub async fn request_account_state(
813 &self,
814 account_id: AccountId,
815 ) -> anyhow::Result<AccountState> {
816 let resp = self
817 .inner
818 .http_list_portfolio_balances(account_id.get_issuers_id())
819 .await
820 .map_err(|e| anyhow::anyhow!(e))?;
821
822 let ts_init = self.generate_ts_init();
823 let account_state = parse_account_state(resp, account_id, ts_init)?;
824
825 Ok(account_state)
826 }
827
828 pub async fn request_instruments(&self) -> anyhow::Result<Vec<InstrumentAny>> {
834 let resp = self
835 .inner
836 .http_list_instruments()
837 .await
838 .map_err(|e| anyhow::anyhow!(e))?;
839
840 let ts_init = self.generate_ts_init();
841
842 let mut instruments: Vec<InstrumentAny> = Vec::new();
843 for inst in &resp {
844 let instrument_any = parse_instrument_any(inst, ts_init);
845 if let Some(instrument_any) = instrument_any {
846 instruments.push(instrument_any);
847 }
848 }
849
850 Ok(instruments)
851 }
852
853 pub async fn request_instrument(&self, symbol: &Symbol) -> anyhow::Result<InstrumentAny> {
859 let resp = self
860 .inner
861 .http_get_instrument_details(symbol.as_str())
862 .await
863 .map_err(|e| anyhow::anyhow!(e))?;
864
865 let ts_init = self.generate_ts_init();
866
867 match parse_instrument_any(&resp, ts_init) {
868 Some(inst) => Ok(inst),
869 None => anyhow::bail!("Unable to parse instrument"),
870 }
871 }
872
873 pub async fn request_order_status_report(
879 &self,
880 account_id: AccountId,
881 venue_order_id: VenueOrderId,
882 ) -> anyhow::Result<OrderStatusReport> {
883 let portfolio_id = account_id.get_issuers_id();
884
885 let resp = self
886 .inner
887 .http_get_order(venue_order_id.as_str(), portfolio_id)
888 .await
889 .map_err(|e| anyhow::anyhow!(e))?;
890
891 let instrument = self.get_instrument_from_cache(resp.symbol)?;
892 let ts_init = self.generate_ts_init();
893
894 let report = parse_order_status_report(
895 resp,
896 account_id,
897 instrument.price_precision(),
898 instrument.size_precision(),
899 ts_init,
900 )?;
901 Ok(report)
902 }
903
904 pub async fn request_order_status_reports(
910 &self,
911 account_id: AccountId,
912 symbol: Symbol,
913 ) -> anyhow::Result<Vec<OrderStatusReport>> {
914 let portfolio_id = account_id.get_issuers_id();
915
916 let mut params = GetOrdersParamsBuilder::default();
917 params.portfolio(portfolio_id);
918 params.instrument(symbol.as_str());
919 let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
920
921 let resp = self
922 .inner
923 .http_list_open_orders(params)
924 .await
925 .map_err(|e| anyhow::anyhow!(e))?;
926
927 let ts_init = get_atomic_clock_realtime().get_time_ns();
928
929 let mut reports: Vec<OrderStatusReport> = Vec::new();
930 for order in resp.results {
931 let instrument = self.get_instrument_from_cache(order.symbol)?;
932 let report = parse_order_status_report(
933 order,
934 account_id,
935 instrument.price_precision(),
936 instrument.size_precision(),
937 ts_init,
938 )?;
939 reports.push(report);
940 }
941
942 Ok(reports)
943 }
944
945 pub async fn request_fill_reports(
951 &self,
952 account_id: AccountId,
953 client_order_id: Option<ClientOrderId>,
954 start: Option<DateTime<Utc>>,
955 ) -> anyhow::Result<Vec<FillReport>> {
956 let portfolio_id = account_id.get_issuers_id();
957
958 let mut params = GetPortfolioFillsParamsBuilder::default();
959 if let Some(start) = start {
960 params.time_from(start);
961 }
962 if let Some(client_order_id) = client_order_id {
963 params.client_order_id(client_order_id.to_string());
964 }
965 let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
966
967 let resp = self
968 .inner
969 .http_list_portfolio_fills(portfolio_id, params)
970 .await
971 .map_err(|e| anyhow::anyhow!(e))?;
972
973 let ts_init = get_atomic_clock_realtime().get_time_ns();
974
975 let mut reports: Vec<FillReport> = Vec::new();
976 for fill in resp.results {
977 let instrument = self.get_instrument_from_cache(fill.symbol)?;
978 let report = parse_fill_report(
979 fill,
980 account_id,
981 instrument.price_precision(),
982 instrument.size_precision(),
983 ts_init,
984 )?;
985 reports.push(report);
986 }
987
988 Ok(reports)
989 }
990
991 pub async fn request_position_status_report(
997 &self,
998 account_id: AccountId,
999 symbol: Symbol,
1000 ) -> anyhow::Result<PositionStatusReport> {
1001 let portfolio_id = account_id.get_issuers_id();
1002
1003 let resp = self
1004 .inner
1005 .http_get_portfolio_position(portfolio_id, symbol.as_str())
1006 .await
1007 .map_err(|e| anyhow::anyhow!(e))?;
1008
1009 let instrument = self.get_instrument_from_cache(resp.symbol)?;
1010 let ts_init = get_atomic_clock_realtime().get_time_ns();
1011
1012 let report =
1013 parse_position_status_report(resp, account_id, instrument.size_precision(), ts_init)?;
1014 Ok(report)
1015 }
1016
1017 pub async fn request_position_status_reports(
1023 &self,
1024 account_id: AccountId,
1025 ) -> anyhow::Result<Vec<PositionStatusReport>> {
1026 let portfolio_id = account_id.get_issuers_id();
1027
1028 let resp = self
1029 .inner
1030 .http_list_portfolio_positions(portfolio_id)
1031 .await
1032 .map_err(|e| anyhow::anyhow!(e))?;
1033
1034 let ts_init = get_atomic_clock_realtime().get_time_ns();
1035
1036 let mut reports: Vec<PositionStatusReport> = Vec::new();
1037 for position in resp {
1038 let instrument = self.get_instrument_from_cache(position.symbol)?;
1039 let report = parse_position_status_report(
1040 position,
1041 account_id,
1042 instrument.size_precision(),
1043 ts_init,
1044 )?;
1045 reports.push(report);
1046 }
1047
1048 Ok(reports)
1049 }
1050
1051 #[allow(clippy::too_many_arguments)]
1057 pub async fn submit_order(
1058 &self,
1059 account_id: AccountId,
1060 client_order_id: ClientOrderId,
1061 symbol: Symbol,
1062 order_side: OrderSide,
1063 order_type: OrderType,
1064 quantity: Quantity,
1065 time_in_force: TimeInForce,
1066 expire_time: Option<DateTime<Utc>>,
1067 price: Option<Price>,
1068 trigger_price: Option<Price>,
1069 post_only: Option<bool>,
1070 reduce_only: Option<bool>,
1071 ) -> anyhow::Result<OrderStatusReport> {
1072 let coinbase_side: CoinbaseIntxSide = order_side.into();
1073 let coinbase_order_type: CoinbaseIntxOrderType = order_type.into();
1074 let coinbase_tif: CoinbaseIntxTimeInForce = time_in_force.into();
1075
1076 let mut params = CreateOrderParamsBuilder::default();
1077 params.portfolio(account_id.get_issuers_id());
1078 params.client_order_id(client_order_id.as_str());
1079 params.instrument(symbol.as_str());
1080 params.side(coinbase_side);
1081 params.size(quantity.to_string());
1082 params.order_type(coinbase_order_type);
1083 params.tif(coinbase_tif);
1084 if let Some(expire_time) = expire_time {
1085 params.expire_time(expire_time);
1086 }
1087 if let Some(price) = price {
1088 params.price(price.to_string());
1089 }
1090 if let Some(trigger_price) = trigger_price {
1091 params.stop_price(trigger_price.to_string());
1092 }
1093 if let Some(post_only) = post_only {
1094 params.post_only(post_only);
1095 }
1096 if let Some(reduce_only) = reduce_only {
1097 params.close_only(reduce_only);
1098 }
1099 let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1100
1101 let resp = self.inner.http_create_order(params).await?;
1102 log::debug!("Submitted order: {resp:?}");
1103
1104 let instrument = self.get_instrument_from_cache(resp.symbol)?;
1105 let ts_init = get_atomic_clock_realtime().get_time_ns();
1106 let report = parse_order_status_report(
1107 resp,
1108 account_id,
1109 instrument.price_precision(),
1110 instrument.size_precision(),
1111 ts_init,
1112 )?;
1113 Ok(report)
1114 }
1115
1116 pub async fn cancel_order(
1122 &self,
1123 account_id: AccountId,
1124 client_order_id: ClientOrderId,
1125 ) -> anyhow::Result<OrderStatusReport> {
1126 let portfolio_id = account_id.get_issuers_id();
1127
1128 let resp = self
1129 .inner
1130 .http_cancel_order(client_order_id.as_str(), portfolio_id)
1131 .await?;
1132 log::debug!("Canceled order: {resp:?}");
1133
1134 let instrument = self.get_instrument_from_cache(resp.symbol)?;
1135 let ts_init = get_atomic_clock_realtime().get_time_ns();
1136
1137 let report = parse_order_status_report(
1138 resp,
1139 account_id,
1140 instrument.price_precision(),
1141 instrument.size_precision(),
1142 ts_init,
1143 )?;
1144 Ok(report)
1145 }
1146
1147 pub async fn cancel_orders(
1153 &self,
1154 account_id: AccountId,
1155 symbol: Symbol,
1156 order_side: Option<OrderSide>,
1157 ) -> anyhow::Result<Vec<OrderStatusReport>> {
1158 let mut params = CancelOrdersParamsBuilder::default();
1159 params.portfolio(account_id.get_issuers_id());
1160 params.instrument(symbol.as_str());
1161 if let Some(side) = order_side {
1162 let side: CoinbaseIntxSide = side.into();
1163 params.side(side);
1164 }
1165 let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1166
1167 let resp = self.inner.http_cancel_orders(params).await?;
1168
1169 let instrument = self.get_instrument_from_cache(symbol.inner())?;
1170 let ts_init = get_atomic_clock_realtime().get_time_ns();
1171
1172 let mut reports: Vec<OrderStatusReport> = Vec::with_capacity(resp.len());
1173 for order in resp {
1174 log::debug!("Canceled order: {order:?}");
1175 let report = parse_order_status_report(
1176 order,
1177 account_id,
1178 instrument.price_precision(),
1179 instrument.size_precision(),
1180 ts_init,
1181 )?;
1182 reports.push(report);
1183 }
1184
1185 Ok(reports)
1186 }
1187
1188 #[allow(clippy::too_many_arguments)]
1194 pub async fn modify_order(
1195 &self,
1196 account_id: AccountId,
1197 client_order_id: ClientOrderId,
1198 new_client_order_id: ClientOrderId,
1199 price: Option<Price>,
1200 trigger_price: Option<Price>,
1201 quantity: Option<Quantity>,
1202 ) -> anyhow::Result<OrderStatusReport> {
1203 let mut params = ModifyOrderParamsBuilder::default();
1204 params.portfolio(account_id.get_issuers_id());
1205 params.client_order_id(new_client_order_id.as_str());
1206 if let Some(price) = price {
1207 params.price(price.to_string());
1208 }
1209 if let Some(trigger_price) = trigger_price {
1210 params.price(trigger_price.to_string());
1211 }
1212 if let Some(quantity) = quantity {
1213 params.size(quantity.to_string());
1214 }
1215 let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1216
1217 let resp = self
1218 .inner
1219 .http_modify_order(client_order_id.as_str(), params)
1220 .await?;
1221 log::debug!("Modified order {}", resp.client_order_id);
1222
1223 let instrument = self.get_instrument_from_cache(resp.symbol)?;
1224 let ts_init = get_atomic_clock_realtime().get_time_ns();
1225 let report = parse_order_status_report(
1226 resp,
1227 account_id,
1228 instrument.price_precision(),
1229 instrument.size_precision(),
1230 ts_init,
1231 )?;
1232 Ok(report)
1233 }
1234}