1use std::collections::HashMap;
4
5use async_stream::try_stream;
6use futures::stream::Stream;
7use reqwest;
8use uritemplate::UriTemplate;
9use uuid::Uuid;
10
11use crate::accounts::{Account, AccountResponse, AccountsResponse};
12use crate::basic_oauth::AccessTokenProvider;
13use crate::error::{CbError, CbRequestError};
14use crate::fees;
15use crate::orders::{
16 CancelOrderResponse, CancelOrdersResponse, CreateOrderResponse, FillsResponse, Order,
17 OrdersResponse,
18};
19use crate::products::{
20 Candle, CandlesResponse, ContractExpiryType, Granularity, MarketTrades, Pricebook,
21 PricebookResponse, PricebooksResponse, Product, ProductType, ProductsResponse,
22};
23use crate::MAIN_URL;
24use crate::{orders, DateTime};
25
26pub struct CbClient<'a> {
28 https_client: reqwest::Client,
29 access_token_provider: &'a (dyn AccessTokenProvider + 'a),
31}
32
33type Result<T> = std::result::Result<T, CbError>;
34
35impl<'a> CbClient<'a> {
36 pub fn new(oauth_cb_client: &'a (dyn AccessTokenProvider + 'a)) -> Self {
54 CbClient {
55 https_client: reqwest::Client::new(),
56 access_token_provider: oauth_cb_client,
57 }
58 }
59
60 async fn get<T>(&self, request_url: &str) -> Result<T>
61 where
62 T: serde::de::DeserializeOwned,
63 {
64 let response = self
65 .https_client
66 .get(request_url)
67 .bearer_auth(self.access_token_provider.access_token().secret())
68 .send()
69 .await?;
70
71 Self::unpack_response(response).await
72 }
73
74 async fn post<T, U>(&self, request_url: &str, object: &T) -> Result<U>
75 where
76 T: serde::ser::Serialize,
77 U: serde::de::DeserializeOwned,
78 {
79 let response = self
80 .https_client
81 .post(request_url)
82 .json(object)
83 .bearer_auth(self.access_token_provider.access_token().secret())
84 .send()
85 .await?;
86
87 Self::unpack_response(response).await
88 }
89
90 async fn unpack_response<T>(response: reqwest::Response) -> Result<T>
91 where
92 T: serde::de::DeserializeOwned,
93 {
94 let text_content = response.text().await?;
95 println!("{:#?}", text_content);
96
97 match serde_json::from_str::<T>(&text_content) {
98 Ok(result) => Ok(result),
99 Err(err) => match serde_json::from_str::<CbRequestError>(&text_content) {
100 Ok(cb_err) => Err(CbError::Coinbase(cb_err)),
101 Err(_) => Err(CbError::Serde(err)),
102 },
103 }
104 }
105
106 pub fn list_accounts<'b>(
113 &'b self,
114 limit: Option<i32>,
115 cursor: Option<String>,
116 ) -> impl Stream<Item = Result<Vec<Account>>> + 'b {
117 try_stream! {
118 let uri = Self::get_list_accounts_uri(limit, cursor);
119 let mut accounts_response: AccountsResponse = self.get(&uri).await?;
120 yield accounts_response.accounts;
121
122 while accounts_response.has_next {
123 let cursor = Some(accounts_response.cursor.clone());
124 let uri = Self::get_list_accounts_uri(limit, cursor);
125 accounts_response= self.get(&uri).await?;
126 yield accounts_response.accounts;
127 }
128 }
129 }
130
131 fn get_list_accounts_uri(limit: Option<i32>, cursor: Option<String>) -> String {
132 let args = QueryArgs::new()
133 .add_optional_scalar_arg("limit", &limit)
134 .add_optional_scalar_arg("cursor", &cursor);
135 let uri_string = MAIN_URL.to_string() + "/brokerage/accounts{?query*}";
136 let uri = UriTemplate::new(&uri_string)
137 .set("query", args.get())
138 .build();
139 uri
140 }
141
142 pub async fn get_account(&self, account_uuid: Uuid) -> Result<Account> {
148 let uri_string = MAIN_URL.to_string() + "/brokerage/accounts/{uuid}";
149 let uri = UriTemplate::new(&uri_string)
150 .set("uuid", account_uuid.to_string())
151 .build();
152 let account_response: AccountResponse = self.get(&uri).await?;
153 Ok(account_response.account)
154 }
155
156 pub async fn get_best_bid_ask(
160 &self,
161 product_ids: &Option<Vec<&str>>,
162 ) -> Result<Vec<Pricebook>> {
163 let args = QueryArgs::new().add_optional_vec_args("product_ids", product_ids);
164 let uri_string = MAIN_URL.to_string() + "/brokerage/best_bid_ask{?query*}";
165 let uri = UriTemplate::new(&uri_string)
166 .set("query", args.get())
167 .build();
168 let pricebooks_response: PricebooksResponse = self.get(&uri).await?;
169 Ok(pricebooks_response.pricebooks)
170 }
171
172 pub async fn get_product_book(
176 &self,
177 product_id: &str,
178 limit: Option<i32>,
179 ) -> Result<Pricebook> {
180 let args = QueryArgs::new()
181 .add_mandatory_arg("product_id", &product_id)
182 .add_optional_scalar_arg("limit", &limit);
183 let uri_string = MAIN_URL.to_string() + "/brokerage/product_book/{?query*}";
184 let uri = UriTemplate::new(&uri_string)
185 .set("query", args.get())
186 .build();
187 let pricebook_response: PricebookResponse = self.get(&uri).await?;
188
189 Ok(pricebook_response.pricebook)
190 }
191
192 pub async fn list_products(
196 &self,
197 limit: Option<i32>,
198 offset: Option<i32>,
199 product_type: Option<ProductType>,
200 product_ids: &Option<Vec<&str>>,
201 contract_expiry_type: Option<ContractExpiryType>,
202 ) -> Result<Vec<Product>> {
203 let args = QueryArgs::new()
204 .add_optional_scalar_arg("limit", &limit)
205 .add_optional_scalar_arg("offset", &offset)
206 .add_optional_scalar_arg("product_type", &product_type)
207 .add_optional_vec_args("product_ids", product_ids)
208 .add_optional_scalar_arg("contract_expiry_type", &contract_expiry_type);
209 let uri_string = MAIN_URL.to_string() + "/brokerage/products{?query*}";
210 let uri = UriTemplate::new(&uri_string)
211 .set("query", args.get())
212 .build();
213 let products_response: ProductsResponse = self.get(&uri).await?;
214
215 Ok(products_response.products)
216 }
217
218 pub async fn get_product(&self, product_id: &str) -> Result<Product> {
222 let uri_string = MAIN_URL.to_string() + "/brokerage/products/{product_id}";
223 let uri = UriTemplate::new(&uri_string)
224 .set("product_id", product_id.to_string())
225 .build();
226 let product: Product = self.get(&uri).await?;
227 Ok(product)
228 }
229
230 pub async fn get_product_candles(
234 &self,
235 product_id: &str,
236 start: &DateTime,
237 end: &DateTime,
238 granularity: Granularity,
239 ) -> Result<Vec<Candle>> {
240 let uri_string = MAIN_URL.to_string() + "/brokerage/products/{product_id}/candles?start={start}&end={end}&granularity={granularity}";
241 let uri = UriTemplate::new(&uri_string)
242 .set("product_id", product_id.to_string())
243 .set("start", start.timestamp().to_string())
244 .set("end", end.timestamp().to_string())
245 .set("granularity", granularity.to_string())
246 .build();
247 let candles_response: CandlesResponse = self.get(&uri).await?;
248 Ok(candles_response.candles)
249 }
250
251 pub async fn get_market_trades(&self, product_id: &str, limit: i32) -> Result<MarketTrades> {
255 let uri_string =
256 MAIN_URL.to_string() + "/brokerage/products/{product_id}/ticker?limit={limit}";
257 let uri = UriTemplate::new(&uri_string)
258 .set("product_id", product_id.to_string())
259 .set("limit", limit.to_string())
260 .build();
261 let market_trades: MarketTrades = self.get(&uri).await?;
262 Ok(market_trades)
263 }
264
265 pub fn list_orders<'b>(
269 &'b self,
270 product_id: Option<String>,
271 order_status: Option<Vec<orders::Status>>,
272 limit: Option<i32>,
273 start_date: Option<DateTime>,
274 end_date: Option<DateTime>,
275 deprecated_user_native_currency: Option<String>,
276 order_type: Option<orders::OrderType>,
277 order_side: Option<crate::products::Side>,
278 cursor: Option<String>,
279 product_type: Option<ProductType>,
280 order_placement_source: Option<orders::OrderPlacementSource>,
281 contract_expiry_type: Option<ContractExpiryType>,
282 ) -> impl Stream<Item = Result<Vec<Order>>> + 'b {
283 try_stream! {
284 let uri = Self::get_list_orders_uri(
285 &product_id,&order_status, &limit, &start_date, &end_date,
286 &deprecated_user_native_currency, &order_type, &order_side,
287 &cursor, &product_type, &order_placement_source, &contract_expiry_type);
288
289 let mut orders_response: OrdersResponse = self.get(&uri).await?;
290 yield orders_response.orders;
291
292 while orders_response.has_next {
293 let cursor = Some(orders_response.cursor.clone());
294 let uri = Self::get_list_orders_uri(
295 &product_id, &order_status, &limit, &start_date, &end_date,
296 &deprecated_user_native_currency, &order_type, &order_side,
297 &cursor, &product_type, &order_placement_source, &contract_expiry_type);
298 orders_response= self.get(&uri).await?;
299 yield orders_response.orders;
300 }
301 }
302 }
303
304 fn get_list_orders_uri(
307 product_id: &Option<String>,
308 order_status: &Option<Vec<orders::Status>>,
309 limit: &Option<i32>,
310 start_date: &Option<DateTime>,
311 end_date: &Option<DateTime>,
312 deprecated_user_native_currency: &Option<String>,
313 order_type: &Option<orders::OrderType>,
314 order_side: &Option<crate::products::Side>,
315 cursor: &Option<String>,
316 product_type: &Option<ProductType>,
317 order_placement_source: &Option<orders::OrderPlacementSource>,
318 contract_expiry_type: &Option<ContractExpiryType>,
319 ) -> String {
320 let args = QueryArgs::new()
321 .add_optional_scalar_arg("product_id", product_id)
322 .add_optional_vec_args("order_status", order_status)
323 .add_optional_scalar_arg("limit", limit)
324 .add_optional_datetime_arg("start_date", start_date) .add_optional_datetime_arg("end_date", end_date)
326 .add_optional_scalar_arg(
327 "deprecated_user_native_currency",
328 deprecated_user_native_currency,
329 )
330 .add_optional_scalar_arg("order_type", order_type)
331 .add_optional_scalar_arg("order_side", order_side)
332 .add_optional_scalar_arg("cursor", cursor)
333 .add_optional_scalar_arg("product_type", product_type)
334 .add_optional_scalar_arg("order_placement_source", order_placement_source)
335 .add_optional_scalar_arg("contract_expirty_type", contract_expiry_type);
336
337 let uri_string = MAIN_URL.to_string() + "/brokerage/orders/historical/batch{?query*}";
338 let uri = UriTemplate::new(&uri_string)
339 .set("query", args.get())
340 .build();
341 uri
342 }
343
344 pub fn list_fills<'b>(
348 &'b self,
349 order_id: Option<String>,
350 product_id: Option<String>,
351 start_sequence_timestamp: Option<DateTime>,
352 end_sequence_timestamp: Option<DateTime>,
353 limit: Option<i64>, cursor: Option<String>,
355 ) -> impl Stream<Item = Result<Vec<orders::Fill>>> + 'b {
356 try_stream! {
357 let uri = Self::get_list_fills_uri(&order_id, &product_id, &start_sequence_timestamp, &end_sequence_timestamp, &limit, &cursor);
358
359 let mut fills_response: FillsResponse = self.get(&uri).await?;
360 yield fills_response.fills;
361
362 while fills_response.cursor != "" { let cursor = Some(fills_response.cursor.clone());
364 let uri = Self::get_list_fills_uri(&order_id, &product_id, &start_sequence_timestamp, &end_sequence_timestamp, &limit, &cursor);
365 fills_response= self.get(&uri).await?;
366 yield fills_response.fills;
367 }
368 }
369 }
370
371 fn get_list_fills_uri(
372 order_id: &Option<String>,
373 product_id: &Option<String>,
374 start_sequence_timestamp: &Option<DateTime>,
375 end_sequence_timestamp: &Option<DateTime>,
376 limit: &Option<i64>, cursor: &Option<String>,
378 ) -> String {
379 let args = QueryArgs::new()
380 .add_optional_scalar_arg("order_id", order_id)
381 .add_optional_scalar_arg("product_id", product_id)
382 .add_optional_scalar_arg("start_sequence_timestamp", start_sequence_timestamp)
383 .add_optional_scalar_arg("end_sequence_timestamp", end_sequence_timestamp)
384 .add_optional_scalar_arg("limit", limit)
385 .add_optional_scalar_arg("cursor", cursor);
386 let uri_string = MAIN_URL.to_string() + "/brokerage/orders/historical/fills{?query*}";
387 let uri = UriTemplate::new(&uri_string)
388 .set("query", args.get())
389 .build();
390 uri
391 }
392
393 pub async fn get_order(&self, order_id: &str) -> Result<Order> {
397 let uri_string = MAIN_URL.to_string() + "/brokerage/orders/historical/{order_id}";
398 let uri = UriTemplate::new(&uri_string)
399 .set("order_id", order_id.to_string())
400 .build();
401 let order_response: orders::OrderResponse = self.get(&uri).await?;
402 Ok(order_response.order)
403 }
404
405 pub async fn get_transactions_summary(
409 &self,
410 start_date: Option<DateTime>,
411 end_date: Option<DateTime>,
412 user_native_currency: Option<String>,
413 product_type: Option<ProductType>,
414 contract_expiry_type: Option<ContractExpiryType>,
415 ) -> Result<fees::TransactionsSummary> {
416 let args = QueryArgs::new()
417 .add_optional_datetime_arg("start_date", &start_date)
418 .add_optional_datetime_arg("end_date", &end_date)
419 .add_optional_scalar_arg("user_native_currency", &user_native_currency)
420 .add_optional_scalar_arg("product_type", &product_type)
421 .add_optional_scalar_arg("contract_expiry_type", &contract_expiry_type);
422 let uri_string = MAIN_URL.to_string() + "/brokerage/transaction_summary{?query*}";
423 let uri = UriTemplate::new(&uri_string)
424 .set("query", args.get())
425 .build();
426
427 let transaction_summary: fees::TransactionsSummary = self.get(&uri).await?;
428 Ok(transaction_summary)
429 }
430
431 pub async fn create_order(&self, order: &orders::OrderToSend) -> Result<CreateOrderResponse> {
437 let uri = MAIN_URL.to_string() + "/brokerage/orders";
438 self.post(&uri, order).await
439 }
440
441 pub async fn cancel_order(&self, order_ids: &Vec<String>) -> Result<Vec<CancelOrderResponse>> {
447 let mut m = HashMap::<&str, &Vec<String>>::new();
448 m.insert("order_ids", order_ids);
449
450 let uri = MAIN_URL.to_string() + "/brokerage/orders/batch_cancel";
451 let response = self
452 .post::<HashMap<&str, &Vec<String>>, CancelOrdersResponse>(&uri, &m)
453 .await?;
454 Ok(response.results)
455 }
456}
457
458struct QueryArgs {
460 data: Vec<(String, String)>,
461}
462
463impl QueryArgs {
464 fn new() -> Self {
465 QueryArgs {
466 data: Vec::<(String, String)>::new(),
467 }
468 }
469
470 fn get(&self) -> Vec<(String, String)> {
471 self.data.clone()
472 }
473
474 fn add_mandatory_arg<T: ToString>(mut self, key: &str, value: &T) -> Self {
475 self.data.push((key.to_string(), value.to_string()));
476 self
477 }
478
479 fn add_optional_scalar_arg<T: ToString>(mut self, key: &str, value: &Option<T>) -> Self {
480 if let Some(x) = value {
481 self.data.push((key.to_string(), x.to_string()));
482 }
483 self
484 }
485
486 fn add_optional_datetime_arg(mut self, key: &str, value: &Option<DateTime>) -> Self {
487 if let Some(x) = value {
488 self.data.push((
489 key.to_string(),
490 x.to_rfc3339_opts(chrono::SecondsFormat::Secs, true),
491 ));
492 }
493 self
494 }
495
496 fn add_optional_vec_args<T: ToString>(mut self, key: &str, values: &Option<Vec<T>>) -> Self {
497 if let Some(xs) = values {
498 self.data.append(
499 &mut xs
500 .iter()
501 .map(|x| (key.to_string(), x.to_string()))
502 .collect::<Vec<(String, String)>>(),
503 );
504 }
505 self
506 }
507}