1use crate::okx::auth::OkxAuth;
8use crate::okx::parser::{parse_orderbook, parse_ticker, parse_trade};
9use ccxt_core::error::{Error, Result};
10use ccxt_core::types::{
11 Balance, BalanceEntry, Fee, Market, Order, OrderBook, OrderSide, OrderStatus, OrderType,
12 Ticker, Trade,
13 financial::{Amount, Cost, Price},
14};
15use ccxt_core::ws_client::{WsClient, WsConfig, WsConnectionState};
16use ccxt_core::ws_exchange::MessageStream;
17use futures::Stream;
18use rust_decimal::Decimal;
19use rust_decimal::prelude::FromStr;
20use serde_json::Value;
21use std::collections::HashMap;
22use std::pin::Pin;
23use std::sync::Arc;
24use std::task::{Context, Poll};
25use tokio::sync::{RwLock, mpsc};
26use tracing::{debug, warn};
27
28const DEFAULT_PING_INTERVAL_MS: u64 = 25000;
31
32const DEFAULT_RECONNECT_INTERVAL_MS: u64 = 5000;
34
35const MAX_RECONNECT_ATTEMPTS: u32 = 10;
37
38pub struct OkxWs {
42 client: Arc<WsClient>,
44 subscriptions: Arc<RwLock<Vec<String>>>,
46}
47
48impl OkxWs {
49 pub fn new(url: String) -> Self {
55 let config = WsConfig {
56 url: url.clone(),
57 connect_timeout: 10000,
58 ping_interval: DEFAULT_PING_INTERVAL_MS,
59 reconnect_interval: DEFAULT_RECONNECT_INTERVAL_MS,
60 max_reconnect_attempts: MAX_RECONNECT_ATTEMPTS,
61 auto_reconnect: true,
62 enable_compression: false,
63 pong_timeout: 90000,
64 ..Default::default()
65 };
66
67 Self {
68 client: Arc::new(WsClient::new(config)),
69 subscriptions: Arc::new(RwLock::new(Vec::new())),
70 }
71 }
72
73 pub async fn connect(&self) -> Result<()> {
75 self.client.connect().await
76 }
77
78 pub async fn disconnect(&self) -> Result<()> {
80 self.client.disconnect().await
81 }
82
83 pub fn state(&self) -> WsConnectionState {
85 self.client.state()
86 }
87
88 pub fn is_connected(&self) -> bool {
90 self.client.is_connected()
91 }
92
93 pub async fn receive(&self) -> Option<Value> {
95 self.client.receive().await
96 }
97
98 pub async fn subscribe_ticker(&self, symbol: &str) -> Result<()> {
104 #[allow(clippy::disallowed_methods)]
107 let msg = serde_json::json!({
108 "op": "subscribe",
109 "args": [{
110 "channel": "tickers",
111 "instId": symbol
112 }]
113 });
114
115 self.client.send_json(&msg).await?;
116
117 let sub_key = format!("ticker:{}", symbol);
118 self.subscriptions.write().await.push(sub_key);
119
120 Ok(())
121 }
122
123 pub async fn subscribe_tickers(&self, symbols: &[String]) -> Result<()> {
129 let mut args = Vec::new();
130 for symbol in symbols {
131 let mut arg_map = serde_json::Map::new();
132 arg_map.insert(
133 "channel".to_string(),
134 serde_json::Value::String("tickers".to_string()),
135 );
136 arg_map.insert(
137 "instId".to_string(),
138 serde_json::Value::String(symbol.clone()),
139 );
140 args.push(serde_json::Value::Object(arg_map));
141 }
142
143 #[allow(clippy::disallowed_methods)]
145 let msg = serde_json::json!({
146 "op": "subscribe",
147 "args": args
148 });
149
150 self.client.send_json(&msg).await?;
151
152 let mut subs = self.subscriptions.write().await;
153 for symbol in symbols {
154 subs.push(format!("ticker:{}", symbol));
155 }
156
157 Ok(())
158 }
159
160 pub async fn watch_tickers(&self, symbols: &[String]) -> Result<MessageStream<Vec<Ticker>>> {
172 if !self.is_connected() {
174 self.connect().await?;
175 }
176
177 self.subscribe_tickers(symbols).await?;
179
180 let (tx, rx) = mpsc::unbounded_channel::<Result<Vec<Ticker>>>();
182 let symbols_owned: Vec<String> = symbols.to_vec();
183 let client = Arc::clone(&self.client);
184
185 tokio::spawn(async move {
187 while let Some(msg) = client.receive().await {
188 if let Some(arg) = msg.get("arg") {
190 let channel = arg.get("channel").and_then(|c| c.as_str());
191 let inst_id = arg.get("instId").and_then(|i| i.as_str());
192
193 if channel == Some("tickers") {
194 if let Some(id) = inst_id {
195 let has_data = msg
196 .get("data")
197 .and_then(|d| d.as_array())
198 .and_then(|arr| arr.first())
199 .is_some();
200 if has_data && symbols_owned.iter().any(|s| s == id) {
201 match parse_ws_ticker(&msg, None) {
202 Ok(ticker) => {
203 if tx.send(Ok(vec![ticker])).is_err() {
204 break; }
206 }
207 Err(e) => {
208 if tx.send(Err(e)).is_err() {
209 break;
210 }
211 }
212 }
213 }
214 }
215 }
216 }
217 }
218 });
219
220 Ok(Box::pin(ReceiverStream::new(rx)))
221 }
222 pub async fn subscribe_orderbook(&self, symbol: &str, depth: u32) -> Result<()> {
224 let channel = match depth {
226 d if d <= 5 => "books5",
227 d if d <= 50 => "books50-l2",
228 _ => "books",
229 };
230
231 #[allow(clippy::disallowed_methods)]
233 let msg = serde_json::json!({
234 "op": "subscribe",
235 "args": [{
236 "channel": channel,
237 "instId": symbol
238 }]
239 });
240
241 self.client.send_json(&msg).await?;
242
243 let sub_key = format!("orderbook:{}", symbol);
244 self.subscriptions.write().await.push(sub_key);
245
246 Ok(())
247 }
248
249 pub async fn subscribe_trades(&self, symbol: &str) -> Result<()> {
255 #[allow(clippy::disallowed_methods)]
257 let msg = serde_json::json!({
258 "op": "subscribe",
259 "args": [{
260 "channel": "trades",
261 "instId": symbol
262 }]
263 });
264
265 self.client.send_json(&msg).await?;
266
267 let sub_key = format!("trades:{}", symbol);
268 self.subscriptions.write().await.push(sub_key);
269
270 Ok(())
271 }
272
273 pub async fn subscribe_kline(&self, symbol: &str, interval: &str) -> Result<()> {
280 let channel = format!("candle{}", interval);
281
282 #[allow(clippy::disallowed_methods)]
284 let msg = serde_json::json!({
285 "op": "subscribe",
286 "args": [{
287 "channel": channel,
288 "instId": symbol
289 }]
290 });
291
292 self.client.send_json(&msg).await?;
293
294 let sub_key = format!("kline:{}:{}", symbol, interval);
295 self.subscriptions.write().await.push(sub_key);
296
297 Ok(())
298 }
299
300 pub async fn unsubscribe(&self, stream_name: String) -> Result<()> {
306 let parts: Vec<&str> = stream_name.split(':').collect();
308 if parts.len() < 2 {
309 return Err(Error::invalid_request(format!(
310 "Invalid stream name: {}",
311 stream_name
312 )));
313 }
314
315 let channel_type = parts[0];
316 let symbol = parts[1];
317
318 let channel = match channel_type {
319 "ticker" => "tickers".to_string(),
320 "orderbook" => "books5".to_string(),
321 "trades" => "trades".to_string(),
322 "kline" => {
323 if parts.len() >= 3 {
324 format!("candle{}", parts[2])
325 } else {
326 return Err(Error::invalid_request(
327 "Kline unsubscribe requires interval",
328 ));
329 }
330 }
331 _ => {
332 return Err(Error::invalid_request(format!(
333 "Unknown channel: {}",
334 channel_type
335 )));
336 }
337 };
338
339 #[allow(clippy::disallowed_methods)]
341 let msg = serde_json::json!({
342 "op": "unsubscribe",
343 "args": [{
344 "channel": channel,
345 "instId": symbol
346 }]
347 });
348
349 self.client.send_json(&msg).await?;
350
351 let mut subs = self.subscriptions.write().await;
353 subs.retain(|s| s != &stream_name);
354
355 Ok(())
356 }
357
358 pub async fn subscriptions(&self) -> Vec<String> {
360 self.subscriptions.read().await.clone()
361 }
362
363 pub async fn watch_ticker(
376 &self,
377 symbol: &str,
378 market: Option<Market>,
379 ) -> Result<MessageStream<Ticker>> {
380 if !self.is_connected() {
382 self.connect().await?;
383 }
384
385 self.subscribe_ticker(symbol).await?;
387
388 let (tx, rx) = mpsc::unbounded_channel::<Result<Ticker>>();
390 let symbol_owned = symbol.to_string();
391 let client = Arc::clone(&self.client);
392
393 tokio::spawn(async move {
395 while let Some(msg) = client.receive().await {
396 if is_ticker_message(&msg, &symbol_owned) {
398 match parse_ws_ticker(&msg, market.as_ref()) {
399 Ok(ticker) => {
400 if tx.send(Ok(ticker)).is_err() {
401 break; }
403 }
404 Err(e) => {
405 if tx.send(Err(e)).is_err() {
406 break;
407 }
408 }
409 }
410 }
411 }
412 });
413
414 Ok(Box::pin(ReceiverStream::new(rx)))
415 }
416
417 pub async fn watch_order_book(
430 &self,
431 symbol: &str,
432 limit: Option<u32>,
433 ) -> Result<MessageStream<OrderBook>> {
434 if !self.is_connected() {
436 self.connect().await?;
437 }
438
439 let depth = limit.unwrap_or(5);
441 self.subscribe_orderbook(symbol, depth).await?;
442
443 let (tx, rx) = mpsc::unbounded_channel::<Result<OrderBook>>();
445 let symbol_owned = symbol.to_string();
446 let unified_symbol = format_unified_symbol(&symbol_owned);
447 let client = Arc::clone(&self.client);
448
449 tokio::spawn(async move {
451 while let Some(msg) = client.receive().await {
452 if is_orderbook_message(&msg, &symbol_owned) {
454 match parse_ws_orderbook(&msg, unified_symbol.clone()) {
455 Ok(orderbook) => {
456 if tx.send(Ok(orderbook)).is_err() {
457 break; }
459 }
460 Err(e) => {
461 if tx.send(Err(e)).is_err() {
462 break;
463 }
464 }
465 }
466 }
467 }
468 });
469
470 Ok(Box::pin(ReceiverStream::new(rx)))
471 }
472
473 pub async fn watch_trades(
486 &self,
487 symbol: &str,
488 market: Option<Market>,
489 ) -> Result<MessageStream<Vec<Trade>>> {
490 if !self.is_connected() {
492 self.connect().await?;
493 }
494
495 self.subscribe_trades(symbol).await?;
497
498 let (tx, rx) = mpsc::unbounded_channel::<Result<Vec<Trade>>>();
500 let symbol_owned = symbol.to_string();
501 let client = Arc::clone(&self.client);
502
503 tokio::spawn(async move {
505 while let Some(msg) = client.receive().await {
506 if is_trade_message(&msg, &symbol_owned) {
508 match parse_ws_trades(&msg, market.as_ref()) {
509 Ok(trades) => {
510 if tx.send(Ok(trades)).is_err() {
511 break; }
513 }
514 Err(e) => {
515 if tx.send(Err(e)).is_err() {
516 break;
517 }
518 }
519 }
520 }
521 }
522 });
523
524 Ok(Box::pin(ReceiverStream::new(rx)))
525 }
526
527 pub async fn login(&self, auth: &OkxAuth) -> Result<()> {
548 let timestamp = (chrono::Utc::now().timestamp_millis() / 1000).to_string();
549 let sign = auth.sign(×tamp, "GET", "/users/self/verify", "");
550
551 #[allow(clippy::disallowed_methods)]
552 let msg = serde_json::json!({
553 "op": "login",
554 "args": [{
555 "apiKey": auth.api_key(),
556 "passphrase": auth.passphrase(),
557 "timestamp": timestamp,
558 "sign": sign
559 }]
560 });
561
562 self.client.send_json(&msg).await?;
563
564 let timeout = tokio::time::Duration::from_secs(10);
566 let start = tokio::time::Instant::now();
567
568 while start.elapsed() < timeout {
569 if let Some(resp) = self.client.receive().await {
570 if let Some(event) = resp.get("event").and_then(|e| e.as_str()) {
571 if event == "login" {
572 let code = resp.get("code").and_then(|c| c.as_str()).unwrap_or("1");
573 if code == "0" {
574 debug!("OKX WebSocket login successful");
575 return Ok(());
576 }
577 let msg_text = resp
578 .get("msg")
579 .and_then(|m| m.as_str())
580 .unwrap_or("Unknown error");
581 return Err(Error::authentication(format!(
582 "OKX WebSocket login failed: {} (code: {})",
583 msg_text, code
584 )));
585 }
586 }
587 }
588 }
589
590 Err(Error::authentication(
591 "OKX WebSocket login timed out waiting for response",
592 ))
593 }
594
595 pub async fn subscribe_orders(&self, inst_type: Option<&str>) -> Result<()> {
599 let mut arg = serde_json::Map::new();
600 arg.insert("channel".to_string(), Value::String("orders".to_string()));
601 arg.insert(
602 "instType".to_string(),
603 Value::String(inst_type.unwrap_or("ANY").to_string()),
604 );
605
606 #[allow(clippy::disallowed_methods)]
607 let msg = serde_json::json!({
608 "op": "subscribe",
609 "args": [Value::Object(arg)]
610 });
611
612 self.client.send_json(&msg).await?;
613
614 self.subscriptions.write().await.push("orders".to_string());
615
616 Ok(())
617 }
618
619 pub async fn subscribe_account(&self) -> Result<()> {
623 #[allow(clippy::disallowed_methods)]
624 let msg = serde_json::json!({
625 "op": "subscribe",
626 "args": [{
627 "channel": "account"
628 }]
629 });
630
631 self.client.send_json(&msg).await?;
632
633 self.subscriptions.write().await.push("account".to_string());
634
635 Ok(())
636 }
637
638 pub async fn watch_balance(&self) -> Result<MessageStream<Balance>> {
642 if !self.is_connected() {
643 self.connect().await?;
644 }
645
646 self.subscribe_account().await?;
647
648 let (tx, rx) = mpsc::unbounded_channel::<Result<Balance>>();
649 let client = Arc::clone(&self.client);
650
651 tokio::spawn(async move {
652 while let Some(msg) = client.receive().await {
653 if is_account_message(&msg) {
654 match parse_ws_balance(&msg) {
655 Ok(balance) => {
656 if tx.send(Ok(balance)).is_err() {
657 break;
658 }
659 }
660 Err(e) => {
661 warn!(error = %e, "Failed to parse OKX balance update");
662 if tx.send(Err(e)).is_err() {
663 break;
664 }
665 }
666 }
667 }
668 }
669 });
670
671 Ok(Box::pin(ReceiverStream::new(rx)))
672 }
673
674 pub async fn watch_orders(
678 &self,
679 symbol_filter: Option<String>,
680 ) -> Result<MessageStream<Order>> {
681 if !self.is_connected() {
682 self.connect().await?;
683 }
684
685 self.subscribe_orders(None).await?;
686
687 let (tx, rx) = mpsc::unbounded_channel::<Result<Order>>();
688 let client = Arc::clone(&self.client);
689
690 tokio::spawn(async move {
691 while let Some(msg) = client.receive().await {
692 if is_orders_message(&msg) {
693 if let Some(data_array) = msg.get("data").and_then(|d| d.as_array()) {
694 for data in data_array {
695 if let Some(ref filter) = symbol_filter {
697 let inst_id =
698 data.get("instId").and_then(|i| i.as_str()).unwrap_or("");
699 let unified = inst_id.replace('-', "/");
700 if unified != *filter && inst_id != filter.as_str() {
701 continue;
702 }
703 }
704
705 match parse_ws_order(data) {
706 Ok(order) => {
707 if tx.send(Ok(order)).is_err() {
708 return;
709 }
710 }
711 Err(e) => {
712 warn!(error = %e, "Failed to parse OKX order update");
713 if tx.send(Err(e)).is_err() {
714 return;
715 }
716 }
717 }
718 }
719 }
720 }
721 }
722 });
723
724 Ok(Box::pin(ReceiverStream::new(rx)))
725 }
726
727 pub async fn watch_my_trades(
732 &self,
733 symbol_filter: Option<String>,
734 ) -> Result<MessageStream<Trade>> {
735 if !self.is_connected() {
736 self.connect().await?;
737 }
738
739 self.subscribe_orders(None).await?;
740
741 let (tx, rx) = mpsc::unbounded_channel::<Result<Trade>>();
742 let client = Arc::clone(&self.client);
743
744 tokio::spawn(async move {
745 while let Some(msg) = client.receive().await {
746 if is_orders_message(&msg) {
747 if let Some(data_array) = msg.get("data").and_then(|d| d.as_array()) {
748 for data in data_array {
749 let fill_sz = data
751 .get("fillSz")
752 .and_then(|v| v.as_str())
753 .and_then(|s| s.parse::<f64>().ok())
754 .unwrap_or(0.0);
755
756 if fill_sz <= 0.0 {
757 continue;
758 }
759
760 if let Some(ref filter) = symbol_filter {
762 let inst_id =
763 data.get("instId").and_then(|i| i.as_str()).unwrap_or("");
764 let unified = inst_id.replace('-', "/");
765 if unified != *filter && inst_id != filter.as_str() {
766 continue;
767 }
768 }
769
770 match parse_ws_my_trade(data) {
771 Ok(trade) => {
772 if tx.send(Ok(trade)).is_err() {
773 return;
774 }
775 }
776 Err(e) => {
777 warn!(error = %e, "Failed to parse OKX trade update");
778 if tx.send(Err(e)).is_err() {
779 return;
780 }
781 }
782 }
783 }
784 }
785 }
786 }
787 });
788
789 Ok(Box::pin(ReceiverStream::new(rx)))
790 }
791}
792
793struct ReceiverStream<T> {
799 receiver: mpsc::UnboundedReceiver<T>,
800}
801
802impl<T> ReceiverStream<T> {
803 fn new(receiver: mpsc::UnboundedReceiver<T>) -> Self {
804 Self { receiver }
805 }
806}
807
808impl<T> Stream for ReceiverStream<T> {
809 type Item = T;
810
811 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
812 self.receiver.poll_recv(cx)
813 }
814}
815
816fn is_ticker_message(msg: &Value, symbol: &str) -> bool {
822 if let Some(arg) = msg.get("arg") {
825 let channel = arg.get("channel").and_then(|c| c.as_str());
826 let inst_id = arg.get("instId").and_then(|i| i.as_str());
827 let has_data = msg
828 .get("data")
829 .and_then(|d| d.as_array())
830 .and_then(|arr| arr.first())
831 .is_some();
832 channel == Some("tickers") && inst_id == Some(symbol) && has_data
833 } else {
834 false
835 }
836}
837
838fn is_orderbook_message(msg: &Value, symbol: &str) -> bool {
840 if let Some(arg) = msg.get("arg") {
843 let channel = arg.get("channel").and_then(|c| c.as_str());
844 let inst_id = arg.get("instId").and_then(|i| i.as_str());
845 let has_data = msg
846 .get("data")
847 .and_then(|d| d.as_array())
848 .and_then(|arr| arr.first())
849 .is_some();
850 if let (Some(ch), Some(id)) = (channel, inst_id) {
851 ch.starts_with("books") && id == symbol && has_data
852 } else {
853 false
854 }
855 } else {
856 false
857 }
858}
859
860fn is_trade_message(msg: &Value, symbol: &str) -> bool {
862 if let Some(arg) = msg.get("arg") {
865 let channel = arg.get("channel").and_then(|c| c.as_str());
866 let inst_id = arg.get("instId").and_then(|i| i.as_str());
867 let has_data = msg
868 .get("data")
869 .and_then(|d| d.as_array())
870 .and_then(|arr| arr.first())
871 .is_some();
872 channel == Some("trades") && inst_id == Some(symbol) && has_data
873 } else {
874 false
875 }
876}
877
878fn format_unified_symbol(symbol: &str) -> String {
880 symbol.replace('-', "/")
881}
882
883pub fn parse_ws_ticker(msg: &Value, market: Option<&Market>) -> Result<Ticker> {
889 let data = msg
892 .get("data")
893 .and_then(|d| d.as_array())
894 .and_then(|arr| arr.first())
895 .ok_or_else(|| Error::invalid_request("Missing data in ticker message"))?;
896
897 parse_ticker(data, market)
898}
899
900pub fn parse_ws_orderbook(msg: &Value, symbol: String) -> Result<OrderBook> {
902 let data = msg
905 .get("data")
906 .and_then(|d| d.as_array())
907 .and_then(|arr| arr.first())
908 .ok_or_else(|| Error::invalid_request("Missing data in orderbook message"))?;
909
910 parse_orderbook(data, symbol)
911}
912
913pub fn parse_ws_trade(msg: &Value, market: Option<&Market>) -> Result<Trade> {
915 let data = msg
918 .get("data")
919 .and_then(|d| d.as_array())
920 .and_then(|arr| arr.first())
921 .ok_or_else(|| Error::invalid_request("Missing data in trade message"))?;
922
923 parse_trade(data, market)
924}
925
926pub fn parse_ws_trades(msg: &Value, market: Option<&Market>) -> Result<Vec<Trade>> {
928 let data_array = msg
931 .get("data")
932 .and_then(|d| d.as_array())
933 .ok_or_else(|| Error::invalid_request("Missing data in trade message"))?;
934
935 let mut trades = Vec::with_capacity(data_array.len());
936 for data in data_array {
937 trades.push(parse_trade(data, market)?);
938 }
939
940 Ok(trades)
941}
942
943fn is_account_message(msg: &Value) -> bool {
949 if let Some(arg) = msg.get("arg") {
950 arg.get("channel").and_then(|c| c.as_str()) == Some("account")
951 } else {
952 false
953 }
954}
955
956fn is_orders_message(msg: &Value) -> bool {
958 if let Some(arg) = msg.get("arg") {
959 arg.get("channel").and_then(|c| c.as_str()) == Some("orders")
960 } else {
961 false
962 }
963}
964
965pub fn parse_ws_balance(msg: &Value) -> Result<Balance> {
986 let data = msg
987 .get("data")
988 .and_then(|d| d.as_array())
989 .and_then(|arr| arr.first())
990 .ok_or_else(|| Error::invalid_request("Missing data in account message"))?;
991
992 let mut info = HashMap::new();
994
995 if let Some(details) = data.get("details").and_then(|d| d.as_array()) {
996 for detail in details {
997 let currency = detail
998 .get("ccy")
999 .and_then(|c| c.as_str())
1000 .unwrap_or_default()
1001 .to_string();
1002
1003 if currency.is_empty() {
1004 continue;
1005 }
1006
1007 let free = detail
1008 .get("availBal")
1009 .and_then(|v| v.as_str())
1010 .and_then(|s| Decimal::from_str(s).ok())
1011 .unwrap_or_default();
1012
1013 let used = detail
1014 .get("frozenBal")
1015 .and_then(|v| v.as_str())
1016 .and_then(|s| Decimal::from_str(s).ok())
1017 .unwrap_or_default();
1018
1019 let total = free + used;
1020
1021 info.insert(currency, BalanceEntry { free, used, total });
1022 }
1023 }
1024
1025 Ok(Balance {
1026 balances: info,
1027 info: HashMap::new(),
1028 })
1029}
1030
1031pub fn parse_ws_order(data: &Value) -> Result<Order> {
1054 let inst_id = data
1055 .get("instId")
1056 .and_then(|v| v.as_str())
1057 .unwrap_or_default();
1058 let symbol = inst_id.replace('-', "/");
1059
1060 let id = data
1061 .get("ordId")
1062 .and_then(|v| v.as_str())
1063 .unwrap_or_default()
1064 .to_string();
1065
1066 let client_order_id = data
1067 .get("clOrdId")
1068 .and_then(|v| v.as_str())
1069 .map(ToString::to_string);
1070
1071 let side = match data.get("side").and_then(|v| v.as_str()) {
1072 Some("sell") => OrderSide::Sell,
1073 _ => OrderSide::Buy,
1074 };
1075
1076 let order_type = match data.get("ordType").and_then(|v| v.as_str()) {
1077 Some("market") => OrderType::Market,
1078 _ => OrderType::Limit,
1079 };
1080
1081 let price = data
1082 .get("px")
1083 .and_then(|v| v.as_str())
1084 .and_then(|s| Decimal::from_str(s).ok());
1085
1086 let amount = data
1087 .get("sz")
1088 .and_then(|v| v.as_str())
1089 .and_then(|s| Decimal::from_str(s).ok())
1090 .unwrap_or_default();
1091
1092 let filled = data
1093 .get("accFillSz")
1094 .and_then(|v| v.as_str())
1095 .and_then(|s| Decimal::from_str(s).ok());
1096
1097 let average = data
1098 .get("avgPx")
1099 .and_then(|v| v.as_str())
1100 .and_then(|s| Decimal::from_str(s).ok())
1101 .filter(|d| !d.is_zero());
1102
1103 let cost = match (filled, average) {
1104 (Some(f), Some(a)) => Some(f * a),
1105 _ => None,
1106 };
1107
1108 let status = match data.get("state").and_then(|v| v.as_str()) {
1109 Some("filled") => OrderStatus::Closed,
1110 Some("canceled" | "cancelled") => OrderStatus::Cancelled,
1111 _ => OrderStatus::Open,
1112 };
1113
1114 let fee = {
1115 let fee_amount = data
1116 .get("fee")
1117 .and_then(|v| v.as_str())
1118 .and_then(|s| Decimal::from_str(s).ok());
1119 let fee_currency = data
1120 .get("feeCcy")
1121 .and_then(|v| v.as_str())
1122 .unwrap_or_default()
1123 .to_string();
1124 fee_amount.map(|f| Fee::new(fee_currency, f.abs()))
1125 };
1126
1127 let timestamp = data
1128 .get("cTime")
1129 .and_then(|v| v.as_str())
1130 .and_then(|s| s.parse::<i64>().ok());
1131
1132 Ok(Order {
1133 id,
1134 client_order_id,
1135 symbol,
1136 order_type,
1137 side,
1138 price,
1139 amount,
1140 filled,
1141 remaining: None,
1142 cost,
1143 average,
1144 status,
1145 fee,
1146 fees: None,
1147 timestamp,
1148 datetime: None,
1149 last_trade_timestamp: None,
1150 time_in_force: None,
1151 post_only: None,
1152 reduce_only: None,
1153 stop_price: None,
1154 trigger_price: None,
1155 take_profit_price: None,
1156 stop_loss_price: None,
1157 trailing_delta: None,
1158 trailing_percent: None,
1159 activation_price: None,
1160 callback_rate: None,
1161 working_type: None,
1162 trades: None,
1163 info: HashMap::new(),
1164 })
1165}
1166
1167pub fn parse_ws_my_trade(data: &Value) -> Result<Trade> {
1171 let inst_id = data
1172 .get("instId")
1173 .and_then(|v| v.as_str())
1174 .unwrap_or_default();
1175 let symbol = inst_id.replace('-', "/");
1176
1177 let trade_id = data
1178 .get("tradeId")
1179 .and_then(|v| v.as_str())
1180 .map(ToString::to_string);
1181
1182 let order_id = data
1183 .get("ordId")
1184 .and_then(|v| v.as_str())
1185 .map(ToString::to_string);
1186
1187 let side = match data.get("side").and_then(|v| v.as_str()) {
1188 Some("buy") => OrderSide::Buy,
1189 _ => OrderSide::Sell,
1190 };
1191
1192 let fill_px = data
1193 .get("fillPx")
1194 .and_then(|v| v.as_str())
1195 .and_then(|s| Decimal::from_str(s).ok())
1196 .unwrap_or_default();
1197
1198 let fill_sz = data
1199 .get("fillSz")
1200 .and_then(|v| v.as_str())
1201 .and_then(|s| Decimal::from_str(s).ok())
1202 .unwrap_or_default();
1203
1204 let timestamp = data
1205 .get("fillTime")
1206 .or_else(|| data.get("uTime"))
1207 .and_then(|v| v.as_str())
1208 .and_then(|s| s.parse::<i64>().ok())
1209 .unwrap_or(0);
1210
1211 let fee = {
1212 let fee_amount = data
1213 .get("fillFee")
1214 .or_else(|| data.get("fee"))
1215 .and_then(|v| v.as_str())
1216 .and_then(|s| Decimal::from_str(s).ok());
1217 let fee_currency = data
1218 .get("fillFeeCcy")
1219 .or_else(|| data.get("feeCcy"))
1220 .and_then(|v| v.as_str())
1221 .unwrap_or_default()
1222 .to_string();
1223 fee_amount.map(|f| Fee::new(fee_currency, f.abs()))
1224 };
1225
1226 Ok(Trade {
1227 id: trade_id,
1228 order: order_id,
1229 symbol,
1230 trade_type: None,
1231 side,
1232 taker_or_maker: None,
1233 price: Price(fill_px),
1234 amount: Amount(fill_sz),
1235 cost: Some(Cost(fill_px * fill_sz)),
1236 fee,
1237 timestamp,
1238 datetime: None,
1239 info: HashMap::new(),
1240 })
1241}
1242
1243#[cfg(test)]
1244mod tests {
1245 use super::*;
1246 use ccxt_core::types::financial::Price;
1247 use rust_decimal_macros::dec;
1248
1249 #[test]
1250 fn test_okx_ws_creation() {
1251 let ws = OkxWs::new("wss://ws.okx.com:8443/ws/v5/public".to_string());
1252 assert!(ws.subscriptions.try_read().is_ok());
1253 }
1254
1255 #[tokio::test]
1256 async fn test_subscriptions_empty_by_default() {
1257 let ws = OkxWs::new("wss://ws.okx.com:8443/ws/v5/public".to_string());
1258 let subs = ws.subscriptions().await;
1259 assert!(subs.is_empty());
1260 }
1261
1262 #[test]
1265 fn test_is_ticker_message_true() {
1266 let msg = serde_json::from_str(
1267 r#"{
1268 "arg": {"channel": "tickers", "instId": "BTC-USDT"},
1269 "data": [{}]
1270 }"#,
1271 )
1272 .unwrap();
1273
1274 assert!(is_ticker_message(&msg, "BTC-USDT"));
1275 }
1276
1277 #[test]
1278 fn test_is_ticker_message_wrong_symbol() {
1279 let msg = serde_json::from_str(
1280 r#"{
1281 "arg": {"channel": "tickers", "instId": "ETH-USDT"},
1282 "data": [{}]
1283 }"#,
1284 )
1285 .unwrap();
1286
1287 assert!(!is_ticker_message(&msg, "BTC-USDT"));
1288 }
1289
1290 #[test]
1291 fn test_is_ticker_message_wrong_channel() {
1292 let msg = serde_json::from_str(
1293 r#"{
1294 "arg": {"channel": "trades", "instId": "BTC-USDT"},
1295 "data": [{}]
1296 }"#,
1297 )
1298 .unwrap();
1299
1300 assert!(!is_ticker_message(&msg, "BTC-USDT"));
1301 }
1302
1303 #[test]
1304 fn test_is_orderbook_message_books5() {
1305 let msg = serde_json::from_str(
1306 r#"{
1307 "arg": {"channel": "books5", "instId": "BTC-USDT"},
1308 "data": [{}]
1309 }"#,
1310 )
1311 .unwrap();
1312
1313 assert!(is_orderbook_message(&msg, "BTC-USDT"));
1314 }
1315
1316 #[test]
1317 fn test_is_orderbook_message_books50() {
1318 let msg = serde_json::from_str(
1319 r#"{
1320 "arg": {"channel": "books50-l2", "instId": "BTC-USDT"},
1321 "data": [{}]
1322 }"#,
1323 )
1324 .unwrap();
1325
1326 assert!(is_orderbook_message(&msg, "BTC-USDT"));
1327 }
1328
1329 #[test]
1330 fn test_is_trade_message_true() {
1331 let msg = serde_json::from_str(
1332 r#"{
1333 "arg": {"channel": "trades", "instId": "BTC-USDT"},
1334 "data": [{}]
1335 }"#,
1336 )
1337 .unwrap();
1338
1339 assert!(is_trade_message(&msg, "BTC-USDT"));
1340 }
1341
1342 #[test]
1343 fn test_format_unified_symbol() {
1344 assert_eq!(format_unified_symbol("BTC-USDT"), "BTC/USDT");
1345 assert_eq!(format_unified_symbol("ETH-BTC"), "ETH/BTC");
1346 }
1347
1348 #[test]
1351 fn test_parse_ws_ticker() {
1352 let msg = serde_json::from_str(
1353 r#"{
1354 "arg": {"channel": "tickers", "instId": "BTC-USDT"},
1355 "data": [{
1356 "instId": "BTC-USDT",
1357 "last": "50000.00",
1358 "high24h": "51000.00",
1359 "low24h": "49000.00",
1360 "bidPx": "49999.00",
1361 "askPx": "50001.00",
1362 "vol24h": "1000.5",
1363 "ts": "1700000000000"
1364 }]
1365 }"#,
1366 )
1367 .unwrap();
1368
1369 let ticker = parse_ws_ticker(&msg, None).unwrap();
1370 assert_eq!(ticker.symbol, "BTC/USDT");
1372 assert_eq!(ticker.last, Some(Price::new(dec!(50000.00))));
1373 assert_eq!(ticker.high, Some(Price::new(dec!(51000.00))));
1374 assert_eq!(ticker.low, Some(Price::new(dec!(49000.00))));
1375 }
1376
1377 #[test]
1378 fn test_parse_ws_ticker_with_market() {
1379 let msg = serde_json::from_str(
1380 r#"{
1381 "arg": {"channel": "tickers", "instId": "BTC-USDT"},
1382 "data": [{
1383 "instId": "BTC-USDT",
1384 "last": "50000.00",
1385 "ts": "1700000000000"
1386 }]
1387 }"#,
1388 )
1389 .unwrap();
1390
1391 let market = Market {
1392 id: "BTC-USDT".to_string(),
1393 symbol: "BTC/USDT".to_string(),
1394 base: "BTC".to_string(),
1395 quote: "USDT".to_string(),
1396 ..Default::default()
1397 };
1398
1399 let ticker = parse_ws_ticker(&msg, Some(&market)).unwrap();
1400 assert_eq!(ticker.symbol, "BTC/USDT");
1401 }
1402
1403 #[test]
1404 fn test_parse_ws_ticker_missing_data() {
1405 let msg = serde_json::from_str(
1406 r#"{
1407 "arg": {"channel": "tickers", "instId": "BTC-USDT"}
1408 }"#,
1409 )
1410 .unwrap();
1411
1412 let result = parse_ws_ticker(&msg, None);
1413 assert!(result.is_err());
1414 }
1415
1416 #[test]
1419 fn test_parse_ws_orderbook() {
1420 let msg = serde_json::from_str(
1421 r#"{
1422 "arg": {"channel": "books5", "instId": "BTC-USDT"},
1423 "data": [{
1424 "asks": [
1425 ["50001.00", "1.0", "0", "1"],
1426 ["50002.00", "3.0", "0", "2"],
1427 ["50003.00", "2.5", "0", "1"]
1428 ],
1429 "bids": [
1430 ["50000.00", "1.5", "0", "2"],
1431 ["49999.00", "2.0", "0", "1"],
1432 ["49998.00", "0.5", "0", "1"]
1433 ],
1434 "ts": "1700000000000"
1435 }]
1436 }"#,
1437 )
1438 .unwrap();
1439
1440 let orderbook = parse_ws_orderbook(&msg, "BTC/USDT".to_string()).unwrap();
1441 assert_eq!(orderbook.symbol, "BTC/USDT");
1442 assert_eq!(orderbook.bids.len(), 3);
1443 assert_eq!(orderbook.asks.len(), 3);
1444
1445 assert_eq!(orderbook.bids[0].price, Price::new(dec!(50000.00)));
1447 assert_eq!(orderbook.bids[1].price, Price::new(dec!(49999.00)));
1448
1449 assert_eq!(orderbook.asks[0].price, Price::new(dec!(50001.00)));
1451 assert_eq!(orderbook.asks[1].price, Price::new(dec!(50002.00)));
1452 }
1453
1454 #[test]
1455 fn test_parse_ws_orderbook_missing_data() {
1456 let msg = serde_json::from_str(
1457 r#"{
1458 "arg": {"channel": "books5", "instId": "BTC-USDT"}
1459 }"#,
1460 )
1461 .unwrap();
1462
1463 let result = parse_ws_orderbook(&msg, "BTC/USDT".to_string());
1464 assert!(result.is_err());
1465 }
1466
1467 #[test]
1470 fn test_parse_ws_trade() {
1471 let msg = serde_json::from_str(
1472 r#"{
1473 "arg": {"channel": "trades", "instId": "BTC-USDT"},
1474 "data": [{
1475 "instId": "BTC-USDT",
1476 "tradeId": "123456789",
1477 "px": "50000.00",
1478 "sz": "0.5",
1479 "side": "buy",
1480 "ts": "1700000000000"
1481 }]
1482 }"#,
1483 )
1484 .unwrap();
1485
1486 let trade = parse_ws_trade(&msg, None).unwrap();
1487 assert_eq!(trade.timestamp, 1700000000000);
1488 }
1489
1490 #[test]
1491 fn test_parse_ws_trades_multiple() {
1492 let msg = serde_json::from_str(
1493 r#"{
1494 "arg": {"channel": "trades", "instId": "BTC-USDT"},
1495 "data": [
1496 {
1497 "instId": "BTC-USDT",
1498 "tradeId": "123456789",
1499 "px": "50000.00",
1500 "sz": "0.5",
1501 "side": "buy",
1502 "ts": "1700000000000"
1503 },
1504 {
1505 "instId": "BTC-USDT",
1506 "tradeId": "123456790",
1507 "px": "50001.00",
1508 "sz": "1.0",
1509 "side": "sell",
1510 "ts": "1700000000001"
1511 }
1512 ]
1513 }"#,
1514 )
1515 .unwrap();
1516
1517 let trades = parse_ws_trades(&msg, None).unwrap();
1518 assert_eq!(trades.len(), 2);
1519 }
1520
1521 #[test]
1522 fn test_parse_ws_trade_missing_data() {
1523 let msg = serde_json::from_str(
1524 r#"{
1525 "arg": {"channel": "trades", "instId": "BTC-USDT"}
1526 }"#,
1527 )
1528 .unwrap();
1529
1530 let result = parse_ws_trade(&msg, None);
1531 assert!(result.is_err());
1532 }
1533
1534 #[test]
1535 fn test_parse_ws_trades_empty_array() {
1536 let msg = serde_json::from_str(
1537 r#"{
1538 "arg": {"channel": "trades", "instId": "BTC-USDT"},
1539 "data": []
1540 }"#,
1541 )
1542 .unwrap();
1543
1544 let trades = parse_ws_trades(&msg, None).unwrap();
1545 assert!(trades.is_empty());
1546 }
1547}