1use super::{Okx, OkxAuth, error, parser};
6use ccxt_core::{
7 Error, ParseError, Result,
8 types::{Balance, Market, OHLCV, Order, OrderBook, OrderSide, OrderType, Ticker, Trade},
9};
10use reqwest::header::{HeaderMap, HeaderValue};
11use serde_json::Value;
12use std::collections::HashMap;
13use tracing::{debug, info, warn};
14
15impl Okx {
16 fn get_timestamp(&self) -> String {
22 chrono::Utc::now()
23 .format("%Y-%m-%dT%H:%M:%S%.3fZ")
24 .to_string()
25 }
26
27 fn get_auth(&self) -> Result<OkxAuth> {
29 let config = &self.base().config;
30
31 let api_key = config
32 .api_key
33 .as_ref()
34 .ok_or_else(|| Error::authentication("API key is required"))?;
35 let secret = config
36 .secret
37 .as_ref()
38 .ok_or_else(|| Error::authentication("API secret is required"))?;
39 let passphrase = config
40 .password
41 .as_ref()
42 .ok_or_else(|| Error::authentication("Passphrase is required"))?;
43
44 Ok(OkxAuth::new(
45 api_key.clone(),
46 secret.clone(),
47 passphrase.clone(),
48 ))
49 }
50
51 pub fn check_required_credentials(&self) -> Result<()> {
53 self.base().check_required_credentials()?;
54 if self.base().config.password.is_none() {
55 return Err(Error::authentication("Passphrase is required for OKX"));
56 }
57 Ok(())
58 }
59
60 fn build_api_path(&self, endpoint: &str) -> String {
62 format!("/api/v5{}", endpoint)
63 }
64
65 fn get_inst_type(&self) -> &str {
67 match self.options().account_mode.as_str() {
68 "cross" | "isolated" => "MARGIN",
69 _ => "SPOT",
70 }
71 }
72
73 async fn public_request(
75 &self,
76 method: &str,
77 path: &str,
78 params: Option<&HashMap<String, String>>,
79 ) -> Result<Value> {
80 let urls = self.urls();
81 let mut url = format!("{}{}", urls.rest, path);
82
83 if let Some(p) = params {
84 if !p.is_empty() {
85 let query: Vec<String> = p
86 .iter()
87 .map(|(k, v)| format!("{}={}", k, urlencoding::encode(v)))
88 .collect();
89 url = format!("{}?{}", url, query.join("&"));
90 }
91 }
92
93 debug!("OKX public request: {} {}", method, url);
94
95 let mut headers = HeaderMap::new();
97 if self.options().demo || self.base().config.sandbox {
98 headers.insert("x-simulated-trading", HeaderValue::from_static("1"));
99 }
100
101 let response = match method.to_uppercase().as_str() {
102 "GET" => {
103 if headers.is_empty() {
104 self.base().http_client.get(&url, None).await?
105 } else {
106 self.base().http_client.get(&url, Some(headers)).await?
107 }
108 }
109 "POST" => {
110 if headers.is_empty() {
111 self.base().http_client.post(&url, None, None).await?
112 } else {
113 self.base()
114 .http_client
115 .post(&url, Some(headers), None)
116 .await?
117 }
118 }
119 _ => {
120 return Err(Error::invalid_request(format!(
121 "Unsupported HTTP method: {}",
122 method
123 )));
124 }
125 };
126
127 if error::is_error_response(&response) {
129 return Err(error::parse_error(&response));
130 }
131
132 Ok(response)
133 }
134
135 async fn private_request(
137 &self,
138 method: &str,
139 path: &str,
140 params: Option<&HashMap<String, String>>,
141 body: Option<&Value>,
142 ) -> Result<Value> {
143 self.check_required_credentials()?;
144
145 let auth = self.get_auth()?;
146 let urls = self.urls();
147 let timestamp = self.get_timestamp();
148
149 let query_string = if let Some(p) = params {
151 if !p.is_empty() {
152 let query: Vec<String> = p
153 .iter()
154 .map(|(k, v)| format!("{}={}", k, urlencoding::encode(v)))
155 .collect();
156 format!("?{}", query.join("&"))
157 } else {
158 String::new()
159 }
160 } else {
161 String::new()
162 };
163
164 let body_string = body
166 .map(|b| serde_json::to_string(b).unwrap_or_default())
167 .unwrap_or_default();
168
169 let sign_path = format!("{}{}", path, query_string);
171 let signature = auth.sign(×tamp, method, &sign_path, &body_string);
172
173 let mut headers = HeaderMap::new();
175 auth.add_auth_headers(&mut headers, ×tamp, &signature);
176 headers.insert("Content-Type", HeaderValue::from_static("application/json"));
177
178 if self.options().demo || self.base().config.sandbox {
180 headers.insert("x-simulated-trading", HeaderValue::from_static("1"));
181 }
182
183 let url = format!("{}{}{}", urls.rest, path, query_string);
184 debug!("OKX private request: {} {}", method, url);
185
186 let response = match method.to_uppercase().as_str() {
187 "GET" => self.base().http_client.get(&url, Some(headers)).await?,
188 "POST" => {
189 let body_value = body.cloned();
190 self.base()
191 .http_client
192 .post(&url, Some(headers), body_value)
193 .await?
194 }
195 "DELETE" => {
196 self.base()
197 .http_client
198 .delete(&url, Some(headers), None)
199 .await?
200 }
201 _ => {
202 return Err(Error::invalid_request(format!(
203 "Unsupported HTTP method: {}",
204 method
205 )));
206 }
207 };
208
209 if error::is_error_response(&response) {
211 return Err(error::parse_error(&response));
212 }
213
214 Ok(response)
215 }
216
217 pub async fn fetch_markets(&self) -> Result<Vec<Market>> {
243 let path = self.build_api_path("/public/instruments");
244 let mut params = HashMap::new();
245 params.insert("instType".to_string(), self.get_inst_type().to_string());
246
247 let response = self.public_request("GET", &path, Some(¶ms)).await?;
248
249 let data = response
250 .get("data")
251 .ok_or_else(|| Error::from(ParseError::missing_field("data")))?;
252
253 let instruments = data.as_array().ok_or_else(|| {
254 Error::from(ParseError::invalid_format(
255 "data",
256 "Expected array of instruments",
257 ))
258 })?;
259
260 let mut markets = Vec::new();
261 for instrument in instruments {
262 match parser::parse_market(instrument) {
263 Ok(market) => markets.push(market),
264 Err(e) => {
265 warn!(error = %e, "Failed to parse market");
266 }
267 }
268 }
269
270 let markets = self.base().set_markets(markets, None).await?;
272
273 info!("Loaded {} markets for OKX", markets.len());
274 Ok(markets)
275 }
276
277 pub async fn load_markets(&self, reload: bool) -> Result<HashMap<String, Market>> {
289 {
290 let cache = self.base().market_cache.read().await;
291 if cache.loaded && !reload {
292 debug!(
293 "Returning cached markets for OKX ({} markets)",
294 cache.markets.len()
295 );
296 return Ok(cache.markets.clone());
297 }
298 }
299
300 info!("Loading markets for OKX (reload: {})", reload);
301 let _markets = self.fetch_markets().await?;
302
303 let cache = self.base().market_cache.read().await;
304 Ok(cache.markets.clone())
305 }
306
307 pub async fn fetch_ticker(&self, symbol: &str) -> Result<Ticker> {
317 let market = self.base().market(symbol).await?;
318
319 let path = self.build_api_path("/market/ticker");
320 let mut params = HashMap::new();
321 params.insert("instId".to_string(), market.id.clone());
322
323 let response = self.public_request("GET", &path, Some(¶ms)).await?;
324
325 let data = response
326 .get("data")
327 .ok_or_else(|| Error::from(ParseError::missing_field("data")))?;
328
329 let tickers = data.as_array().ok_or_else(|| {
331 Error::from(ParseError::invalid_format(
332 "data",
333 "Expected array of tickers",
334 ))
335 })?;
336
337 if tickers.is_empty() {
338 return Err(Error::bad_symbol(format!("No ticker data for {}", symbol)));
339 }
340
341 parser::parse_ticker(&tickers[0], Some(&market))
342 }
343
344 pub async fn fetch_tickers(&self, symbols: Option<Vec<String>>) -> Result<Vec<Ticker>> {
354 let cache = self.base().market_cache.read().await;
355 if !cache.loaded {
356 drop(cache);
357 return Err(Error::exchange(
358 "-1",
359 "Markets not loaded. Call load_markets() first.",
360 ));
361 }
362 drop(cache);
363
364 let path = self.build_api_path("/market/tickers");
365 let mut params = HashMap::new();
366 params.insert("instType".to_string(), self.get_inst_type().to_string());
367
368 let response = self.public_request("GET", &path, Some(¶ms)).await?;
369
370 let data = response
371 .get("data")
372 .ok_or_else(|| Error::from(ParseError::missing_field("data")))?;
373
374 let tickers_array = data.as_array().ok_or_else(|| {
375 Error::from(ParseError::invalid_format(
376 "data",
377 "Expected array of tickers",
378 ))
379 })?;
380
381 let mut tickers = Vec::new();
382 for ticker_data in tickers_array {
383 if let Some(inst_id) = ticker_data["instId"].as_str() {
384 let cache = self.base().market_cache.read().await;
385 if let Some(market) = cache.markets_by_id.get(inst_id) {
386 let market_clone = market.clone();
387 drop(cache);
388
389 match parser::parse_ticker(ticker_data, Some(&market_clone)) {
390 Ok(ticker) => {
391 if let Some(ref syms) = symbols {
392 if syms.contains(&ticker.symbol) {
393 tickers.push(ticker);
394 }
395 } else {
396 tickers.push(ticker);
397 }
398 }
399 Err(e) => {
400 warn!(
401 error = %e,
402 symbol = %inst_id,
403 "Failed to parse ticker"
404 );
405 }
406 }
407 } else {
408 drop(cache);
409 }
410 }
411 }
412
413 Ok(tickers)
414 }
415
416 pub async fn fetch_order_book(&self, symbol: &str, limit: Option<u32>) -> Result<OrderBook> {
431 let market = self.base().market(symbol).await?;
432
433 let path = self.build_api_path("/market/books");
434 let mut params = HashMap::new();
435 params.insert("instId".to_string(), market.id.clone());
436
437 let actual_limit = limit.map(|l| l.min(400)).unwrap_or(100);
440 params.insert("sz".to_string(), actual_limit.to_string());
441
442 let response = self.public_request("GET", &path, Some(¶ms)).await?;
443
444 let data = response
445 .get("data")
446 .ok_or_else(|| Error::from(ParseError::missing_field("data")))?;
447
448 let books = data.as_array().ok_or_else(|| {
450 Error::from(ParseError::invalid_format(
451 "data",
452 "Expected array of orderbooks",
453 ))
454 })?;
455
456 if books.is_empty() {
457 return Err(Error::bad_symbol(format!(
458 "No orderbook data for {}",
459 symbol
460 )));
461 }
462
463 parser::parse_orderbook(&books[0], market.symbol.clone())
464 }
465
466 pub async fn fetch_trades(&self, symbol: &str, limit: Option<u32>) -> Result<Vec<Trade>> {
477 let market = self.base().market(symbol).await?;
478
479 let path = self.build_api_path("/market/trades");
480 let mut params = HashMap::new();
481 params.insert("instId".to_string(), market.id.clone());
482
483 let actual_limit = limit.map(|l| l.min(500)).unwrap_or(100);
485 params.insert("limit".to_string(), actual_limit.to_string());
486
487 let response = self.public_request("GET", &path, Some(¶ms)).await?;
488
489 let data = response
490 .get("data")
491 .ok_or_else(|| Error::from(ParseError::missing_field("data")))?;
492
493 let trades_array = data.as_array().ok_or_else(|| {
494 Error::from(ParseError::invalid_format(
495 "data",
496 "Expected array of trades",
497 ))
498 })?;
499
500 let mut trades = Vec::new();
501 for trade_data in trades_array {
502 match parser::parse_trade(trade_data, Some(&market)) {
503 Ok(trade) => trades.push(trade),
504 Err(e) => {
505 warn!(error = %e, "Failed to parse trade");
506 }
507 }
508 }
509
510 trades.sort_by(|a, b| b.timestamp.cmp(&a.timestamp));
512
513 Ok(trades)
514 }
515
516 pub async fn fetch_ohlcv(
529 &self,
530 symbol: &str,
531 timeframe: &str,
532 since: Option<i64>,
533 limit: Option<u32>,
534 ) -> Result<Vec<OHLCV>> {
535 let market = self.base().market(symbol).await?;
536
537 let timeframes = self.timeframes();
539 let okx_timeframe = timeframes.get(timeframe).ok_or_else(|| {
540 Error::invalid_request(format!("Unsupported timeframe: {}", timeframe))
541 })?;
542
543 let path = self.build_api_path("/market/candles");
544 let mut params = HashMap::new();
545 params.insert("instId".to_string(), market.id.clone());
546 params.insert("bar".to_string(), okx_timeframe.clone());
547
548 let actual_limit = limit.map(|l| l.min(300)).unwrap_or(100);
550 params.insert("limit".to_string(), actual_limit.to_string());
551
552 if let Some(start_time) = since {
553 params.insert("after".to_string(), start_time.to_string());
554 }
555
556 let response = self.public_request("GET", &path, Some(¶ms)).await?;
557
558 let data = response
559 .get("data")
560 .ok_or_else(|| Error::from(ParseError::missing_field("data")))?;
561
562 let candles_array = data.as_array().ok_or_else(|| {
563 Error::from(ParseError::invalid_format(
564 "data",
565 "Expected array of candles",
566 ))
567 })?;
568
569 let mut ohlcv = Vec::new();
570 for candle_data in candles_array {
571 match parser::parse_ohlcv(candle_data) {
572 Ok(candle) => ohlcv.push(candle),
573 Err(e) => {
574 warn!(error = %e, "Failed to parse OHLCV");
575 }
576 }
577 }
578
579 ohlcv.sort_by(|a, b| a.timestamp.cmp(&b.timestamp));
581
582 Ok(ohlcv)
583 }
584
585 pub async fn fetch_balance(&self) -> Result<Balance> {
599 let path = self.build_api_path("/account/balance");
600 let response = self.private_request("GET", &path, None, None).await?;
601
602 let data = response
603 .get("data")
604 .ok_or_else(|| Error::from(ParseError::missing_field("data")))?;
605
606 let balances = data.as_array().ok_or_else(|| {
608 Error::from(ParseError::invalid_format(
609 "data",
610 "Expected array of balances",
611 ))
612 })?;
613
614 if balances.is_empty() {
615 return Ok(Balance {
616 balances: HashMap::new(),
617 info: HashMap::new(),
618 });
619 }
620
621 parser::parse_balance(&balances[0])
622 }
623
624 pub async fn fetch_my_trades(
636 &self,
637 symbol: &str,
638 since: Option<i64>,
639 limit: Option<u32>,
640 ) -> Result<Vec<Trade>> {
641 let market = self.base().market(symbol).await?;
642
643 let path = self.build_api_path("/trade/fills");
644 let mut params = HashMap::new();
645 params.insert("instId".to_string(), market.id.clone());
646 params.insert("instType".to_string(), self.get_inst_type().to_string());
647
648 let actual_limit = limit.map(|l| l.min(100)).unwrap_or(100);
650 params.insert("limit".to_string(), actual_limit.to_string());
651
652 if let Some(start_time) = since {
653 params.insert("begin".to_string(), start_time.to_string());
654 }
655
656 let response = self
657 .private_request("GET", &path, Some(¶ms), None)
658 .await?;
659
660 let data = response
661 .get("data")
662 .ok_or_else(|| Error::from(ParseError::missing_field("data")))?;
663
664 let trades_array = data.as_array().ok_or_else(|| {
665 Error::from(ParseError::invalid_format(
666 "data",
667 "Expected array of trades",
668 ))
669 })?;
670
671 let mut trades = Vec::new();
672 for trade_data in trades_array {
673 match parser::parse_trade(trade_data, Some(&market)) {
674 Ok(trade) => trades.push(trade),
675 Err(e) => {
676 warn!(error = %e, "Failed to parse my trade");
677 }
678 }
679 }
680
681 Ok(trades)
682 }
683
684 pub async fn create_order(
702 &self,
703 symbol: &str,
704 order_type: OrderType,
705 side: OrderSide,
706 amount: f64,
707 price: Option<f64>,
708 ) -> Result<Order> {
709 let market = self.base().market(symbol).await?;
710
711 let path = self.build_api_path("/trade/order");
712
713 let mut map = serde_json::Map::new();
715 map.insert(
716 "instId".to_string(),
717 serde_json::Value::String(market.id.clone()),
718 );
719 map.insert(
720 "tdMode".to_string(),
721 serde_json::Value::String(self.options().account_mode.clone()),
722 );
723 map.insert(
724 "side".to_string(),
725 serde_json::Value::String(match side {
726 OrderSide::Buy => "buy".to_string(),
727 OrderSide::Sell => "sell".to_string(),
728 }),
729 );
730 map.insert(
731 "ordType".to_string(),
732 serde_json::Value::String(match order_type {
733 OrderType::Market => "market".to_string(),
734 OrderType::Limit => "limit".to_string(),
735 OrderType::LimitMaker => "post_only".to_string(),
736 _ => "limit".to_string(),
737 }),
738 );
739 map.insert(
740 "sz".to_string(),
741 serde_json::Value::String(amount.to_string()),
742 );
743
744 if let Some(p) = price {
746 if order_type == OrderType::Limit || order_type == OrderType::LimitMaker {
747 map.insert("px".to_string(), serde_json::Value::String(p.to_string()));
748 }
749 }
750 let body = serde_json::Value::Object(map);
751
752 let response = self
753 .private_request("POST", &path, None, Some(&body))
754 .await?;
755
756 let data = response
757 .get("data")
758 .ok_or_else(|| Error::from(ParseError::missing_field("data")))?;
759
760 let orders = data.as_array().ok_or_else(|| {
762 Error::from(ParseError::invalid_format(
763 "data",
764 "Expected array of orders",
765 ))
766 })?;
767
768 if orders.is_empty() {
769 return Err(Error::exchange("-1", "No order data returned"));
770 }
771
772 parser::parse_order(&orders[0], Some(&market))
773 }
774
775 pub async fn cancel_order(&self, id: &str, symbol: &str) -> Result<Order> {
786 let market = self.base().market(symbol).await?;
787
788 let path = self.build_api_path("/trade/cancel-order");
789
790 let mut map = serde_json::Map::new();
791 map.insert(
792 "instId".to_string(),
793 serde_json::Value::String(market.id.clone()),
794 );
795 map.insert(
796 "ordId".to_string(),
797 serde_json::Value::String(id.to_string()),
798 );
799 let body = serde_json::Value::Object(map);
800
801 let response = self
802 .private_request("POST", &path, None, Some(&body))
803 .await?;
804
805 let data = response
806 .get("data")
807 .ok_or_else(|| Error::from(ParseError::missing_field("data")))?;
808
809 let orders = data.as_array().ok_or_else(|| {
811 Error::from(ParseError::invalid_format(
812 "data",
813 "Expected array of orders",
814 ))
815 })?;
816
817 if orders.is_empty() {
818 return Err(Error::exchange("-1", "No order data returned"));
819 }
820
821 parser::parse_order(&orders[0], Some(&market))
822 }
823
824 pub async fn fetch_order(&self, id: &str, symbol: &str) -> Result<Order> {
835 let market = self.base().market(symbol).await?;
836
837 let path = self.build_api_path("/trade/order");
838 let mut params = HashMap::new();
839 params.insert("instId".to_string(), market.id.clone());
840 params.insert("ordId".to_string(), id.to_string());
841
842 let response = self
843 .private_request("GET", &path, Some(¶ms), None)
844 .await?;
845
846 let data = response
847 .get("data")
848 .ok_or_else(|| Error::from(ParseError::missing_field("data")))?;
849
850 let orders = data.as_array().ok_or_else(|| {
852 Error::from(ParseError::invalid_format(
853 "data",
854 "Expected array of orders",
855 ))
856 })?;
857
858 if orders.is_empty() {
859 return Err(Error::exchange("51400", "Order not found"));
860 }
861
862 parser::parse_order(&orders[0], Some(&market))
863 }
864
865 pub async fn fetch_open_orders(
877 &self,
878 symbol: Option<&str>,
879 since: Option<i64>,
880 limit: Option<u32>,
881 ) -> Result<Vec<Order>> {
882 let path = self.build_api_path("/trade/orders-pending");
883 let mut params = HashMap::new();
884 params.insert("instType".to_string(), self.get_inst_type().to_string());
885
886 let market = if let Some(sym) = symbol {
887 let m = self.base().market(sym).await?;
888 params.insert("instId".to_string(), m.id.clone());
889 Some(m)
890 } else {
891 None
892 };
893
894 let actual_limit = limit.map(|l| l.min(100)).unwrap_or(100);
896 params.insert("limit".to_string(), actual_limit.to_string());
897
898 if let Some(start_time) = since {
899 params.insert("begin".to_string(), start_time.to_string());
900 }
901
902 let response = self
903 .private_request("GET", &path, Some(¶ms), None)
904 .await?;
905
906 let data = response
907 .get("data")
908 .ok_or_else(|| Error::from(ParseError::missing_field("data")))?;
909
910 let orders_array = data.as_array().ok_or_else(|| {
911 Error::from(ParseError::invalid_format(
912 "data",
913 "Expected array of orders",
914 ))
915 })?;
916
917 let mut orders = Vec::new();
918 for order_data in orders_array {
919 match parser::parse_order(order_data, market.as_ref()) {
920 Ok(order) => orders.push(order),
921 Err(e) => {
922 warn!(error = %e, "Failed to parse open order");
923 }
924 }
925 }
926
927 Ok(orders)
928 }
929
930 pub async fn fetch_closed_orders(
942 &self,
943 symbol: Option<&str>,
944 since: Option<i64>,
945 limit: Option<u32>,
946 ) -> Result<Vec<Order>> {
947 let path = self.build_api_path("/trade/orders-history");
948 let mut params = HashMap::new();
949 params.insert("instType".to_string(), self.get_inst_type().to_string());
950
951 let market = if let Some(sym) = symbol {
952 let m = self.base().market(sym).await?;
953 params.insert("instId".to_string(), m.id.clone());
954 Some(m)
955 } else {
956 None
957 };
958
959 let actual_limit = limit.map(|l| l.min(100)).unwrap_or(100);
961 params.insert("limit".to_string(), actual_limit.to_string());
962
963 if let Some(start_time) = since {
964 params.insert("begin".to_string(), start_time.to_string());
965 }
966
967 let response = self
968 .private_request("GET", &path, Some(¶ms), None)
969 .await?;
970
971 let data = response
972 .get("data")
973 .ok_or_else(|| Error::from(ParseError::missing_field("data")))?;
974
975 let orders_array = data.as_array().ok_or_else(|| {
976 Error::from(ParseError::invalid_format(
977 "data",
978 "Expected array of orders",
979 ))
980 })?;
981
982 let mut orders = Vec::new();
983 for order_data in orders_array {
984 match parser::parse_order(order_data, market.as_ref()) {
985 Ok(order) => orders.push(order),
986 Err(e) => {
987 warn!(error = %e, "Failed to parse closed order");
988 }
989 }
990 }
991
992 Ok(orders)
993 }
994}
995
996#[cfg(test)]
997mod tests {
998 use super::*;
999
1000 #[test]
1001 fn test_build_api_path() {
1002 let okx = Okx::builder().build().unwrap();
1003 let path = okx.build_api_path("/public/instruments");
1004 assert_eq!(path, "/api/v5/public/instruments");
1005 }
1006
1007 #[test]
1008 fn test_get_inst_type_spot() {
1009 let okx = Okx::builder().build().unwrap();
1010 let inst_type = okx.get_inst_type();
1011 assert_eq!(inst_type, "SPOT");
1012 }
1013
1014 #[test]
1015 fn test_get_inst_type_margin() {
1016 let okx = Okx::builder().account_mode("cross").build().unwrap();
1017 let inst_type = okx.get_inst_type();
1018 assert_eq!(inst_type, "MARGIN");
1019 }
1020
1021 #[test]
1022 fn test_get_timestamp() {
1023 let okx = Okx::builder().build().unwrap();
1024 let ts = okx.get_timestamp();
1025
1026 assert!(ts.contains("T"));
1028 assert!(ts.contains("Z"));
1029 assert!(ts.len() > 20);
1030 }
1031}