1use crate::bitget::auth::BitgetAuth;
8use crate::bitget::parser::{parse_orderbook, parse_ticker, parse_trade};
9use ccxt_core::error::{Error, Result};
10use ccxt_core::types::{
11 Balance, BalanceEntry, Fee, Market, Order, OrderBook, OrderSide, OrderStatus, OrderType,
12 Ticker, Trade,
13 financial::{Amount, Cost, Price},
14};
15use ccxt_core::ws_client::{WsClient, WsConfig, WsConnectionState};
16use ccxt_core::ws_exchange::MessageStream;
17use futures::Stream;
18use rust_decimal::Decimal;
19use rust_decimal::prelude::FromStr;
20use serde_json::Value;
21use std::collections::HashMap;
22use std::pin::Pin;
23use std::sync::Arc;
24use std::task::{Context, Poll};
25use tokio::sync::{RwLock, mpsc};
26use tracing::{debug, warn};
27
28const DEFAULT_PING_INTERVAL_MS: u64 = 30000;
30
31const DEFAULT_RECONNECT_INTERVAL_MS: u64 = 5000;
33
34const MAX_RECONNECT_ATTEMPTS: u32 = 10;
36
37pub struct BitgetWs {
41 client: Arc<WsClient>,
43 subscriptions: Arc<RwLock<Vec<String>>>,
45}
46
47impl BitgetWs {
48 pub fn new(url: String) -> Self {
54 let config = WsConfig {
55 url: url.clone(),
56 connect_timeout: 10000,
57 ping_interval: DEFAULT_PING_INTERVAL_MS,
58 reconnect_interval: DEFAULT_RECONNECT_INTERVAL_MS,
59 max_reconnect_attempts: MAX_RECONNECT_ATTEMPTS,
60 auto_reconnect: true,
61 enable_compression: false,
62 pong_timeout: 90000,
63 ..Default::default()
64 };
65
66 Self {
67 client: Arc::new(WsClient::new(config)),
68 subscriptions: Arc::new(RwLock::new(Vec::new())),
69 }
70 }
71
72 pub async fn connect(&self) -> Result<()> {
74 self.client.connect().await
75 }
76
77 pub async fn disconnect(&self) -> Result<()> {
79 self.client.disconnect().await
80 }
81
82 pub fn state(&self) -> WsConnectionState {
84 self.client.state()
85 }
86
87 pub fn is_connected(&self) -> bool {
89 self.client.is_connected()
90 }
91
92 pub async fn receive(&self) -> Option<Value> {
94 self.client.receive().await
95 }
96
97 pub async fn subscribe_ticker(&self, symbol: &str) -> Result<()> {
103 let mut arg_map = serde_json::Map::new();
104 arg_map.insert(
105 "instType".to_string(),
106 serde_json::Value::String("SPOT".to_string()),
107 );
108 arg_map.insert(
109 "channel".to_string(),
110 serde_json::Value::String("ticker".to_string()),
111 );
112 arg_map.insert(
113 "instId".to_string(),
114 serde_json::Value::String(symbol.to_string()),
115 );
116 let args = serde_json::Value::Array(vec![serde_json::Value::Object(arg_map)]);
117
118 let mut msg_map = serde_json::Map::new();
119 msg_map.insert(
120 "op".to_string(),
121 serde_json::Value::String("subscribe".to_string()),
122 );
123 msg_map.insert("args".to_string(), args);
124 let msg = serde_json::Value::Object(msg_map);
125
126 self.client.send_json(&msg).await?;
127
128 let sub_key = format!("ticker:{}", symbol);
129 self.subscriptions.write().await.push(sub_key);
130
131 Ok(())
132 }
133
134 pub async fn subscribe_tickers(&self, symbols: &[String]) -> Result<()> {
140 let mut args = Vec::new();
141 for symbol in symbols {
142 let mut arg_map = serde_json::Map::new();
143 arg_map.insert(
144 "instType".to_string(),
145 serde_json::Value::String("SPOT".to_string()),
146 );
147 arg_map.insert(
148 "channel".to_string(),
149 serde_json::Value::String("ticker".to_string()),
150 );
151 arg_map.insert(
152 "instId".to_string(),
153 serde_json::Value::String(symbol.clone()),
154 );
155 args.push(serde_json::Value::Object(arg_map));
156 }
157
158 let mut msg_map = serde_json::Map::new();
159 msg_map.insert(
160 "op".to_string(),
161 serde_json::Value::String("subscribe".to_string()),
162 );
163 msg_map.insert("args".to_string(), serde_json::Value::Array(args));
164 let msg = serde_json::Value::Object(msg_map);
165
166 self.client.send_json(&msg).await?;
167
168 let mut subs = self.subscriptions.write().await;
169 for symbol in symbols {
170 subs.push(format!("ticker:{}", symbol));
171 }
172
173 Ok(())
174 }
175
176 pub async fn watch_tickers(&self, symbols: &[String]) -> Result<MessageStream<Vec<Ticker>>> {
188 if !self.is_connected() {
190 self.connect().await?;
191 }
192
193 self.subscribe_tickers(symbols).await?;
195
196 let (tx, rx) = mpsc::unbounded_channel::<Result<Vec<Ticker>>>();
198 let symbols_owned: Vec<String> = symbols.to_vec();
199 let client = Arc::clone(&self.client);
200
201 tokio::spawn(async move {
203 while let Some(msg) = client.receive().await {
204 if let Some(arg) = msg.get("arg") {
206 let channel = arg.get("channel").and_then(|c| c.as_str());
207 let inst_id = arg.get("instId").and_then(|i| i.as_str());
208
209 if channel == Some("ticker") {
210 if let Some(id) = inst_id {
211 if symbols_owned.iter().any(|s| s == id) {
212 match parse_ws_ticker(&msg, None) {
213 Ok(ticker) => {
214 if tx.send(Ok(vec![ticker])).is_err() {
215 break; }
217 }
218 Err(e) => {
219 if tx.send(Err(e)).is_err() {
220 break;
221 }
222 }
223 }
224 }
225 }
226 }
227 }
228 }
229 });
230
231 Ok(Box::pin(ReceiverStream::new(rx)))
232 }
233 pub async fn subscribe_orderbook(&self, symbol: &str, depth: u32) -> Result<()> {
235 let channel = match depth {
236 5 => "books5",
237 15 => "books15",
238 _ => "books",
239 };
240
241 let mut arg_map = serde_json::Map::new();
242 arg_map.insert(
243 "instType".to_string(),
244 serde_json::Value::String("SPOT".to_string()),
245 );
246 arg_map.insert(
247 "channel".to_string(),
248 serde_json::Value::String(channel.to_string()),
249 );
250 arg_map.insert(
251 "instId".to_string(),
252 serde_json::Value::String(symbol.to_string()),
253 );
254 let args = serde_json::Value::Array(vec![serde_json::Value::Object(arg_map)]);
255
256 let mut msg_map = serde_json::Map::new();
257 msg_map.insert(
258 "op".to_string(),
259 serde_json::Value::String("subscribe".to_string()),
260 );
261 msg_map.insert("args".to_string(), args);
262 let msg = serde_json::Value::Object(msg_map);
263
264 self.client.send_json(&msg).await?;
265
266 let sub_key = format!("orderbook:{}", symbol);
267 self.subscriptions.write().await.push(sub_key);
268
269 Ok(())
270 }
271
272 pub async fn subscribe_trades(&self, symbol: &str) -> Result<()> {
278 let mut arg_map = serde_json::Map::new();
279 arg_map.insert(
280 "instType".to_string(),
281 serde_json::Value::String("SPOT".to_string()),
282 );
283 arg_map.insert(
284 "channel".to_string(),
285 serde_json::Value::String("trade".to_string()),
286 );
287 arg_map.insert(
288 "instId".to_string(),
289 serde_json::Value::String(symbol.to_string()),
290 );
291 let args = serde_json::Value::Array(vec![serde_json::Value::Object(arg_map)]);
292
293 let mut msg_map = serde_json::Map::new();
294 msg_map.insert(
295 "op".to_string(),
296 serde_json::Value::String("subscribe".to_string()),
297 );
298 msg_map.insert("args".to_string(), args);
299 let msg = serde_json::Value::Object(msg_map);
300
301 self.client.send_json(&msg).await?;
302
303 let sub_key = format!("trades:{}", symbol);
304 self.subscriptions.write().await.push(sub_key);
305
306 Ok(())
307 }
308
309 pub async fn subscribe_kline(&self, symbol: &str, interval: &str) -> Result<()> {
316 let channel = format!("candle{}", interval);
317
318 let mut arg_map = serde_json::Map::new();
319 arg_map.insert(
320 "instType".to_string(),
321 serde_json::Value::String("SPOT".to_string()),
322 );
323 arg_map.insert(
324 "channel".to_string(),
325 serde_json::Value::String(channel.clone()),
326 );
327 arg_map.insert(
328 "instId".to_string(),
329 serde_json::Value::String(symbol.to_string()),
330 );
331 let args = serde_json::Value::Array(vec![serde_json::Value::Object(arg_map)]);
332
333 let mut msg_map = serde_json::Map::new();
334 msg_map.insert(
335 "op".to_string(),
336 serde_json::Value::String("subscribe".to_string()),
337 );
338 msg_map.insert("args".to_string(), args);
339 let msg = serde_json::Value::Object(msg_map);
340
341 self.client.send_json(&msg).await?;
342
343 let sub_key = format!("kline:{}:{}", symbol, interval);
344 self.subscriptions.write().await.push(sub_key);
345
346 Ok(())
347 }
348
349 pub async fn unsubscribe(&self, stream_name: String) -> Result<()> {
355 let parts: Vec<&str> = stream_name.split(':').collect();
357 if parts.len() < 2 {
358 return Err(Error::invalid_request(format!(
359 "Invalid stream name: {}",
360 stream_name
361 )));
362 }
363
364 let channel = parts[0];
365 let symbol = parts[1];
366
367 let bitget_channel = match channel {
368 "ticker" => "ticker",
369 "orderbook" => "books",
370 "trades" => "trade",
371 "kline" => {
372 if parts.len() >= 3 {
373 return self.unsubscribe_kline(symbol, parts[2]).await;
375 }
376 return Err(Error::invalid_request(
377 "Kline unsubscribe requires interval",
378 ));
379 }
380 _ => channel,
381 };
382
383 let mut arg_map = serde_json::Map::new();
384 arg_map.insert(
385 "instType".to_string(),
386 serde_json::Value::String("SPOT".to_string()),
387 );
388 arg_map.insert(
389 "channel".to_string(),
390 serde_json::Value::String(bitget_channel.to_string()),
391 );
392 arg_map.insert(
393 "instId".to_string(),
394 serde_json::Value::String(symbol.to_string()),
395 );
396 let args = serde_json::Value::Array(vec![serde_json::Value::Object(arg_map)]);
397
398 let mut msg_map = serde_json::Map::new();
399 msg_map.insert(
400 "op".to_string(),
401 serde_json::Value::String("unsubscribe".to_string()),
402 );
403 msg_map.insert("args".to_string(), args);
404 let msg = serde_json::Value::Object(msg_map);
405
406 self.client.send_json(&msg).await?;
407
408 let mut subs = self.subscriptions.write().await;
410 subs.retain(|s| s != &stream_name);
411
412 Ok(())
413 }
414
415 async fn unsubscribe_kline(&self, symbol: &str, interval: &str) -> Result<()> {
417 let channel = format!("candle{}", interval);
418
419 let mut arg_map = serde_json::Map::new();
420 arg_map.insert(
421 "instType".to_string(),
422 serde_json::Value::String("SPOT".to_string()),
423 );
424 arg_map.insert(
425 "channel".to_string(),
426 serde_json::Value::String(channel.clone()),
427 );
428 arg_map.insert(
429 "instId".to_string(),
430 serde_json::Value::String(symbol.to_string()),
431 );
432 let args = serde_json::Value::Array(vec![serde_json::Value::Object(arg_map)]);
433
434 let mut msg_map = serde_json::Map::new();
435 msg_map.insert(
436 "op".to_string(),
437 serde_json::Value::String("unsubscribe".to_string()),
438 );
439 msg_map.insert("args".to_string(), args);
440 let msg = serde_json::Value::Object(msg_map);
441
442 self.client.send_json(&msg).await?;
443
444 let sub_key = format!("kline:{}:{}", symbol, interval);
445 let mut subs = self.subscriptions.write().await;
446 subs.retain(|s| s != &sub_key);
447
448 Ok(())
449 }
450
451 pub async fn subscriptions(&self) -> Vec<String> {
453 self.subscriptions.read().await.clone()
454 }
455
456 pub async fn login(&self, auth: &BitgetAuth) -> Result<()> {
463 let timestamp = (chrono::Utc::now().timestamp_millis() / 1000).to_string();
464 let sign = auth.sign(×tamp, "GET", "/user/verify", "");
465
466 #[allow(clippy::disallowed_methods)]
467 let msg = serde_json::json!({
468 "op": "login",
469 "args": [{
470 "apiKey": auth.api_key(),
471 "passphrase": auth.passphrase(),
472 "timestamp": timestamp,
473 "sign": sign
474 }]
475 });
476
477 self.client.send_json(&msg).await?;
478
479 let timeout = tokio::time::Duration::from_secs(10);
481 let start = tokio::time::Instant::now();
482
483 while start.elapsed() < timeout {
484 if let Some(resp) = self.client.receive().await {
485 if let Some(event) = resp.get("event").and_then(|e| e.as_str()) {
486 if event == "login" {
487 let code = resp.get("code").and_then(|c| c.as_str()).unwrap_or("1");
488 if code == "0" {
489 debug!("Bitget WebSocket login successful");
490 return Ok(());
491 }
492 let msg_text = resp
493 .get("msg")
494 .and_then(|m| m.as_str())
495 .unwrap_or("Unknown error");
496 return Err(Error::authentication(format!(
497 "Bitget WebSocket login failed: {} (code: {})",
498 msg_text, code
499 )));
500 }
501 }
502 }
503 }
504
505 Err(Error::authentication(
506 "Bitget WebSocket login timed out waiting for response",
507 ))
508 }
509
510 pub async fn subscribe_orders(&self, inst_type: Option<&str>) -> Result<()> {
514 let inst_type = inst_type.unwrap_or("SPOT");
515
516 #[allow(clippy::disallowed_methods)]
517 let msg = serde_json::json!({
518 "op": "subscribe",
519 "args": [{
520 "instType": inst_type,
521 "channel": "orders",
522 "instId": "default"
523 }]
524 });
525
526 self.client.send_json(&msg).await?;
527 self.subscriptions.write().await.push("orders".to_string());
528 Ok(())
529 }
530
531 pub async fn subscribe_account(&self) -> Result<()> {
535 #[allow(clippy::disallowed_methods)]
536 let msg = serde_json::json!({
537 "op": "subscribe",
538 "args": [{
539 "instType": "SPOT",
540 "channel": "account",
541 "coin": "default"
542 }]
543 });
544
545 self.client.send_json(&msg).await?;
546 self.subscriptions.write().await.push("account".to_string());
547 Ok(())
548 }
549
550 pub async fn watch_balance(&self) -> Result<MessageStream<Balance>> {
554 if !self.is_connected() {
555 self.connect().await?;
556 }
557
558 self.subscribe_account().await?;
559
560 let (tx, rx) = mpsc::unbounded_channel::<Result<Balance>>();
561 let client = Arc::clone(&self.client);
562
563 tokio::spawn(async move {
564 while let Some(msg) = client.receive().await {
565 if is_account_message(&msg) {
566 match parse_ws_balance(&msg) {
567 Ok(balance) => {
568 if tx.send(Ok(balance)).is_err() {
569 break;
570 }
571 }
572 Err(e) => {
573 warn!(error = %e, "Failed to parse Bitget balance update");
574 if tx.send(Err(e)).is_err() {
575 break;
576 }
577 }
578 }
579 }
580 }
581 });
582
583 Ok(Box::pin(ReceiverStream::new(rx)))
584 }
585
586 pub async fn watch_orders(
590 &self,
591 symbol_filter: Option<String>,
592 ) -> Result<MessageStream<Order>> {
593 if !self.is_connected() {
594 self.connect().await?;
595 }
596
597 self.subscribe_orders(None).await?;
598
599 let (tx, rx) = mpsc::unbounded_channel::<Result<Order>>();
600 let client = Arc::clone(&self.client);
601
602 tokio::spawn(async move {
603 while let Some(msg) = client.receive().await {
604 if is_orders_message(&msg) {
605 if let Some(data_array) = msg.get("data").and_then(|d| d.as_array()) {
606 for data in data_array {
607 if let Some(ref filter) = symbol_filter {
609 let inst_id =
610 data.get("instId").and_then(|i| i.as_str()).unwrap_or("");
611 let unified = format_unified_symbol(inst_id);
612 if unified != *filter && inst_id != filter.as_str() {
613 continue;
614 }
615 }
616
617 match parse_ws_order(data) {
618 Ok(order) => {
619 if tx.send(Ok(order)).is_err() {
620 return;
621 }
622 }
623 Err(e) => {
624 warn!(error = %e, "Failed to parse Bitget order update");
625 if tx.send(Err(e)).is_err() {
626 return;
627 }
628 }
629 }
630 }
631 }
632 }
633 }
634 });
635
636 Ok(Box::pin(ReceiverStream::new(rx)))
637 }
638
639 pub async fn watch_my_trades(
644 &self,
645 symbol_filter: Option<String>,
646 ) -> Result<MessageStream<Trade>> {
647 if !self.is_connected() {
648 self.connect().await?;
649 }
650
651 self.subscribe_orders(None).await?;
652
653 let (tx, rx) = mpsc::unbounded_channel::<Result<Trade>>();
654 let client = Arc::clone(&self.client);
655
656 tokio::spawn(async move {
657 while let Some(msg) = client.receive().await {
658 if is_orders_message(&msg) {
659 if let Some(data_array) = msg.get("data").and_then(|d| d.as_array()) {
660 for data in data_array {
661 let fill_sz = data
663 .get("fillSz")
664 .or_else(|| data.get("baseVolume"))
665 .and_then(|v| v.as_str())
666 .and_then(|s| s.parse::<f64>().ok())
667 .unwrap_or(0.0);
668
669 if fill_sz <= 0.0 {
670 continue;
671 }
672
673 if let Some(ref filter) = symbol_filter {
675 let inst_id =
676 data.get("instId").and_then(|i| i.as_str()).unwrap_or("");
677 let unified = format_unified_symbol(inst_id);
678 if unified != *filter && inst_id != filter.as_str() {
679 continue;
680 }
681 }
682
683 match parse_ws_my_trade(data) {
684 Ok(trade) => {
685 if tx.send(Ok(trade)).is_err() {
686 return;
687 }
688 }
689 Err(e) => {
690 warn!(error = %e, "Failed to parse Bitget trade update");
691 if tx.send(Err(e)).is_err() {
692 return;
693 }
694 }
695 }
696 }
697 }
698 }
699 }
700 });
701
702 Ok(Box::pin(ReceiverStream::new(rx)))
703 }
704
705 pub async fn watch_ticker(
738 &self,
739 symbol: &str,
740 market: Option<Market>,
741 ) -> Result<MessageStream<Ticker>> {
742 if !self.is_connected() {
744 self.connect().await?;
745 }
746
747 self.subscribe_ticker(symbol).await?;
749
750 let (tx, rx) = mpsc::unbounded_channel::<Result<Ticker>>();
752 let symbol_owned = symbol.to_string();
753 let client = Arc::clone(&self.client);
754
755 tokio::spawn(async move {
757 while let Some(msg) = client.receive().await {
758 if is_ticker_message(&msg, &symbol_owned) {
760 match parse_ws_ticker(&msg, market.as_ref()) {
761 Ok(ticker) => {
762 if tx.send(Ok(ticker)).is_err() {
763 break; }
765 }
766 Err(e) => {
767 if tx.send(Err(e)).is_err() {
768 break;
769 }
770 }
771 }
772 }
773 }
774 });
775
776 Ok(Box::pin(ReceiverStream::new(rx)))
777 }
778
779 pub async fn watch_order_book(
812 &self,
813 symbol: &str,
814 limit: Option<u32>,
815 ) -> Result<MessageStream<OrderBook>> {
816 if !self.is_connected() {
818 self.connect().await?;
819 }
820
821 let depth = limit.unwrap_or(15);
823 self.subscribe_orderbook(symbol, depth).await?;
824
825 let (tx, rx) = mpsc::unbounded_channel::<Result<OrderBook>>();
827 let symbol_owned = symbol.to_string();
828 let unified_symbol = format_unified_symbol(&symbol_owned);
829 let client = Arc::clone(&self.client);
830
831 tokio::spawn(async move {
833 while let Some(msg) = client.receive().await {
834 if is_orderbook_message(&msg, &symbol_owned) {
836 match parse_ws_orderbook(&msg, unified_symbol.clone()) {
837 Ok(orderbook) => {
838 if tx.send(Ok(orderbook)).is_err() {
839 break; }
841 }
842 Err(e) => {
843 if tx.send(Err(e)).is_err() {
844 break;
845 }
846 }
847 }
848 }
849 }
850 });
851
852 Ok(Box::pin(ReceiverStream::new(rx)))
853 }
854
855 pub async fn watch_trades(
892 &self,
893 symbol: &str,
894 market: Option<Market>,
895 ) -> Result<MessageStream<Vec<Trade>>> {
896 if !self.is_connected() {
898 self.connect().await?;
899 }
900
901 self.subscribe_trades(symbol).await?;
903
904 let (tx, rx) = mpsc::unbounded_channel::<Result<Vec<Trade>>>();
906 let symbol_owned = symbol.to_string();
907 let client = Arc::clone(&self.client);
908
909 tokio::spawn(async move {
911 while let Some(msg) = client.receive().await {
912 if is_trade_message(&msg, &symbol_owned) {
914 match parse_ws_trades(&msg, market.as_ref()) {
915 Ok(trades) => {
916 if tx.send(Ok(trades)).is_err() {
917 break; }
919 }
920 Err(e) => {
921 if tx.send(Err(e)).is_err() {
922 break;
923 }
924 }
925 }
926 }
927 }
928 });
929
930 Ok(Box::pin(ReceiverStream::new(rx)))
931 }
932}
933
934struct ReceiverStream<T> {
940 receiver: mpsc::UnboundedReceiver<T>,
941}
942
943impl<T> ReceiverStream<T> {
944 fn new(receiver: mpsc::UnboundedReceiver<T>) -> Self {
945 Self { receiver }
946 }
947}
948
949impl<T> Stream for ReceiverStream<T> {
950 type Item = T;
951
952 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
953 self.receiver.poll_recv(cx)
954 }
955}
956
957fn is_ticker_message(msg: &Value, symbol: &str) -> bool {
963 if let Some(arg) = msg.get("arg") {
964 let channel = arg.get("channel").and_then(|c| c.as_str());
965 let inst_id = arg.get("instId").and_then(|i| i.as_str());
966
967 channel == Some("ticker") && inst_id == Some(symbol)
968 } else {
969 false
970 }
971}
972
973fn is_orderbook_message(msg: &Value, symbol: &str) -> bool {
975 if let Some(arg) = msg.get("arg") {
976 let channel = arg.get("channel").and_then(|c| c.as_str());
977 let inst_id = arg.get("instId").and_then(|i| i.as_str());
978
979 let is_orderbook_channel = channel.is_some_and(|c| c.starts_with("books"));
981 is_orderbook_channel && inst_id == Some(symbol)
982 } else {
983 false
984 }
985}
986
987fn is_trade_message(msg: &Value, symbol: &str) -> bool {
989 if let Some(arg) = msg.get("arg") {
990 let channel = arg.get("channel").and_then(|c| c.as_str());
991 let inst_id = arg.get("instId").and_then(|i| i.as_str());
992
993 channel == Some("trade") && inst_id == Some(symbol)
994 } else {
995 false
996 }
997}
998
999fn is_account_message(msg: &Value) -> bool {
1005 if let Some(arg) = msg.get("arg") {
1006 arg.get("channel").and_then(|c| c.as_str()) == Some("account")
1007 } else {
1008 false
1009 }
1010}
1011
1012fn is_orders_message(msg: &Value) -> bool {
1014 if let Some(arg) = msg.get("arg") {
1015 arg.get("channel").and_then(|c| c.as_str()) == Some("orders")
1016 } else {
1017 false
1018 }
1019}
1020
1021fn format_unified_symbol(symbol: &str) -> String {
1023 let quote_currencies = ["USDT", "USDC", "BTC", "ETH", "EUR", "USD"];
1025
1026 for quote in "e_currencies {
1027 if let Some(base) = symbol.strip_suffix(quote) {
1028 if !base.is_empty() {
1029 return format!("{}/{}", base, quote);
1030 }
1031 }
1032 }
1033
1034 symbol.to_string()
1036}
1037
1038pub fn parse_ws_ticker(msg: &Value, market: Option<&Market>) -> Result<Ticker> {
1044 let data = msg
1047 .get("data")
1048 .and_then(|d| d.as_array())
1049 .and_then(|arr| arr.first())
1050 .ok_or_else(|| Error::invalid_request("Missing data in ticker message"))?;
1051
1052 parse_ticker(data, market)
1053}
1054
1055pub fn parse_ws_orderbook(msg: &Value, symbol: String) -> Result<OrderBook> {
1057 let data = msg
1060 .get("data")
1061 .and_then(|d| d.as_array())
1062 .and_then(|arr| arr.first())
1063 .ok_or_else(|| Error::invalid_request("Missing data in orderbook message"))?;
1064
1065 parse_orderbook(data, symbol)
1066}
1067
1068pub fn parse_ws_trade(msg: &Value, market: Option<&Market>) -> Result<Trade> {
1070 let data = msg
1073 .get("data")
1074 .and_then(|d| d.as_array())
1075 .and_then(|arr| arr.first())
1076 .ok_or_else(|| Error::invalid_request("Missing data in trade message"))?;
1077
1078 parse_trade(data, market)
1079}
1080
1081pub fn parse_ws_trades(msg: &Value, market: Option<&Market>) -> Result<Vec<Trade>> {
1083 let data_array = msg
1086 .get("data")
1087 .and_then(|d| d.as_array())
1088 .ok_or_else(|| Error::invalid_request("Missing data in trade message"))?;
1089
1090 let mut trades = Vec::with_capacity(data_array.len());
1091 for data in data_array {
1092 trades.push(parse_trade(data, market)?);
1093 }
1094
1095 Ok(trades)
1096}
1097
1098pub fn parse_ws_balance(msg: &Value) -> Result<Balance> {
1117 let data_array = msg
1118 .get("data")
1119 .and_then(|d| d.as_array())
1120 .ok_or_else(|| Error::invalid_request("Missing data in account message"))?;
1121
1122 let mut balances = HashMap::new();
1123
1124 for data in data_array {
1125 let currency = data
1127 .get("coin")
1128 .and_then(|c| c.as_str())
1129 .unwrap_or_default()
1130 .to_string();
1131
1132 if currency.is_empty() {
1133 continue;
1134 }
1135
1136 let free = data
1137 .get("available")
1138 .and_then(|v| v.as_str())
1139 .and_then(|s| Decimal::from_str(s).ok())
1140 .unwrap_or_default();
1141
1142 let used = data
1143 .get("frozen")
1144 .and_then(|v| v.as_str())
1145 .and_then(|s| Decimal::from_str(s).ok())
1146 .unwrap_or_default();
1147
1148 let total = free + used;
1149
1150 balances.insert(currency, BalanceEntry { free, used, total });
1151 }
1152
1153 Ok(Balance {
1154 balances,
1155 info: HashMap::new(),
1156 })
1157}
1158
1159pub fn parse_ws_order(data: &Value) -> Result<Order> {
1182 let inst_id = data
1183 .get("instId")
1184 .and_then(|v| v.as_str())
1185 .unwrap_or_default();
1186 let symbol = format_unified_symbol(inst_id);
1187
1188 let id = data
1189 .get("ordId")
1190 .and_then(|v| v.as_str())
1191 .unwrap_or_default()
1192 .to_string();
1193
1194 let client_order_id = data
1195 .get("clOrdId")
1196 .and_then(|v| v.as_str())
1197 .map(ToString::to_string);
1198
1199 let side = match data.get("side").and_then(|v| v.as_str()) {
1200 Some("sell") => OrderSide::Sell,
1201 _ => OrderSide::Buy,
1202 };
1203
1204 let order_type = match data.get("ordType").and_then(|v| v.as_str()) {
1205 Some("market") => OrderType::Market,
1206 _ => OrderType::Limit,
1207 };
1208
1209 let price = data
1210 .get("px")
1211 .or_else(|| data.get("price"))
1212 .and_then(|v| v.as_str())
1213 .and_then(|s| Decimal::from_str(s).ok());
1214
1215 let amount = data
1216 .get("sz")
1217 .or_else(|| data.get("size"))
1218 .and_then(|v| v.as_str())
1219 .and_then(|s| Decimal::from_str(s).ok())
1220 .unwrap_or_default();
1221
1222 let filled = data
1223 .get("accFillSz")
1224 .or_else(|| data.get("baseVolume"))
1225 .and_then(|v| v.as_str())
1226 .and_then(|s| Decimal::from_str(s).ok());
1227
1228 let average = data
1229 .get("avgPx")
1230 .and_then(|v| v.as_str())
1231 .and_then(|s| Decimal::from_str(s).ok())
1232 .filter(|d| !d.is_zero());
1233
1234 let cost = match (filled, average) {
1235 (Some(f), Some(a)) => Some(f * a),
1236 _ => None,
1237 };
1238
1239 let status = match data.get("status").and_then(|v| v.as_str()) {
1240 Some("filled" | "full-fill") => OrderStatus::Closed,
1241 Some("cancelled" | "canceled") => OrderStatus::Cancelled,
1242 _ => OrderStatus::Open,
1243 };
1244
1245 let fee = {
1246 let fee_amount = data
1247 .get("fee")
1248 .or_else(|| data.get("feeDetail"))
1249 .and_then(|v| v.as_str())
1250 .and_then(|s| Decimal::from_str(s).ok());
1251 let fee_currency = data
1252 .get("feeCcy")
1253 .and_then(|v| v.as_str())
1254 .unwrap_or_default()
1255 .to_string();
1256 fee_amount.map(|f| Fee::new(fee_currency, f.abs()))
1257 };
1258
1259 let timestamp = data
1260 .get("cTime")
1261 .and_then(|v| v.as_str())
1262 .and_then(|s| s.parse::<i64>().ok());
1263
1264 Ok(Order {
1265 id,
1266 client_order_id,
1267 symbol,
1268 order_type,
1269 side,
1270 price,
1271 amount,
1272 filled,
1273 remaining: None,
1274 cost,
1275 average,
1276 status,
1277 fee,
1278 fees: None,
1279 timestamp,
1280 datetime: None,
1281 last_trade_timestamp: None,
1282 time_in_force: None,
1283 post_only: None,
1284 reduce_only: None,
1285 stop_price: None,
1286 trigger_price: None,
1287 take_profit_price: None,
1288 stop_loss_price: None,
1289 trailing_delta: None,
1290 trailing_percent: None,
1291 activation_price: None,
1292 callback_rate: None,
1293 working_type: None,
1294 trades: None,
1295 info: HashMap::new(),
1296 })
1297}
1298
1299pub fn parse_ws_my_trade(data: &Value) -> Result<Trade> {
1303 let inst_id = data
1304 .get("instId")
1305 .and_then(|v| v.as_str())
1306 .unwrap_or_default();
1307 let symbol = format_unified_symbol(inst_id);
1308
1309 let trade_id = data
1310 .get("tradeId")
1311 .and_then(|v| v.as_str())
1312 .map(ToString::to_string);
1313
1314 let order_id = data
1315 .get("ordId")
1316 .and_then(|v| v.as_str())
1317 .map(ToString::to_string);
1318
1319 let side = match data.get("side").and_then(|v| v.as_str()) {
1320 Some("buy") => OrderSide::Buy,
1321 _ => OrderSide::Sell,
1322 };
1323
1324 let fill_px = data
1325 .get("fillPx")
1326 .or_else(|| data.get("price"))
1327 .and_then(|v| v.as_str())
1328 .and_then(|s| Decimal::from_str(s).ok())
1329 .unwrap_or_default();
1330
1331 let fill_sz = data
1332 .get("fillSz")
1333 .or_else(|| data.get("baseVolume"))
1334 .and_then(|v| v.as_str())
1335 .and_then(|s| Decimal::from_str(s).ok())
1336 .unwrap_or_default();
1337
1338 let timestamp = data
1339 .get("fillTime")
1340 .or_else(|| data.get("uTime"))
1341 .or_else(|| data.get("cTime"))
1342 .and_then(|v| v.as_str())
1343 .and_then(|s| s.parse::<i64>().ok())
1344 .unwrap_or(0);
1345
1346 let fee = {
1347 let fee_amount = data
1348 .get("fillFee")
1349 .or_else(|| data.get("fee"))
1350 .and_then(|v| v.as_str())
1351 .and_then(|s| Decimal::from_str(s).ok());
1352 let fee_currency = data
1353 .get("fillFeeCcy")
1354 .or_else(|| data.get("feeCcy"))
1355 .and_then(|v| v.as_str())
1356 .unwrap_or_default()
1357 .to_string();
1358 fee_amount.map(|f| Fee::new(fee_currency, f.abs()))
1359 };
1360
1361 Ok(Trade {
1362 id: trade_id,
1363 order: order_id,
1364 symbol,
1365 trade_type: None,
1366 side,
1367 taker_or_maker: None,
1368 price: Price(fill_px),
1369 amount: Amount(fill_sz),
1370 cost: Some(Cost(fill_px * fill_sz)),
1371 fee,
1372 timestamp,
1373 datetime: None,
1374 info: HashMap::new(),
1375 })
1376}
1377
1378#[cfg(test)]
1379mod tests {
1380 use super::*;
1381 use ccxt_core::types::financial::Price;
1382 use rust_decimal_macros::dec;
1383
1384 #[test]
1385 fn test_bitget_ws_creation() {
1386 let ws = BitgetWs::new("wss://ws.bitget.com/v2/ws/public".to_string());
1387 assert!(ws.subscriptions.try_read().is_ok());
1389 }
1390
1391 #[tokio::test]
1392 async fn test_subscriptions_empty_by_default() {
1393 let ws = BitgetWs::new("wss://ws.bitget.com/v2/ws/public".to_string());
1394 let subs = ws.subscriptions().await;
1395 assert!(subs.is_empty());
1396 }
1397
1398 #[test]
1401 fn test_parse_ws_ticker_snapshot() {
1402 let msg = serde_json::from_str(
1403 r#"{
1404 "action": "snapshot",
1405 "arg": {
1406 "instType": "SPOT",
1407 "channel": "ticker",
1408 "instId": "BTCUSDT"
1409 },
1410 "data": [{
1411 "instId": "BTCUSDT",
1412 "lastPr": "50000.00",
1413 "high24h": "51000.00",
1414 "low24h": "49000.00",
1415 "bidPr": "49999.00",
1416 "askPr": "50001.00",
1417 "baseVolume": "1000.5",
1418 "ts": "1700000000000"
1419 }]
1420 }"#,
1421 )
1422 .unwrap();
1423
1424 let ticker = parse_ws_ticker(&msg, None).unwrap();
1425 assert_eq!(ticker.symbol, "BTCUSDT");
1426 assert_eq!(ticker.last, Some(Price::new(dec!(50000.00))));
1427 assert_eq!(ticker.high, Some(Price::new(dec!(51000.00))));
1428 assert_eq!(ticker.low, Some(Price::new(dec!(49000.00))));
1429 assert_eq!(ticker.bid, Some(Price::new(dec!(49999.00))));
1430 assert_eq!(ticker.ask, Some(Price::new(dec!(50001.00))));
1431 assert_eq!(ticker.timestamp, 1700000000000);
1432 }
1433
1434 #[test]
1435 fn test_parse_ws_ticker_with_market() {
1436 let msg = serde_json::from_str(
1437 r#"{
1438 "action": "snapshot",
1439 "arg": {
1440 "instType": "SPOT",
1441 "channel": "ticker",
1442 "instId": "BTCUSDT"
1443 },
1444 "data": [{
1445 "instId": "BTCUSDT",
1446 "lastPr": "50000.00",
1447 "ts": "1700000000000"
1448 }]
1449 }"#,
1450 )
1451 .unwrap();
1452
1453 let market = Market {
1454 id: "BTCUSDT".to_string(),
1455 symbol: "BTC/USDT".to_string(),
1456 base: "BTC".to_string(),
1457 quote: "USDT".to_string(),
1458 ..Default::default()
1459 };
1460
1461 let ticker = parse_ws_ticker(&msg, Some(&market)).unwrap();
1462 assert_eq!(ticker.symbol, "BTC/USDT");
1463 assert_eq!(ticker.last, Some(Price::new(dec!(50000.00))));
1464 }
1465
1466 #[test]
1467 fn test_parse_ws_ticker_missing_data() {
1468 let msg = serde_json::from_str(
1469 r#"{
1470 "action": "snapshot",
1471 "arg": {
1472 "instType": "SPOT",
1473 "channel": "ticker",
1474 "instId": "BTCUSDT"
1475 }
1476 }"#,
1477 )
1478 .unwrap();
1479
1480 let result = parse_ws_ticker(&msg, None);
1481 assert!(result.is_err());
1482 }
1483
1484 #[test]
1485 fn test_parse_ws_ticker_empty_data_array() {
1486 let msg = serde_json::from_str(
1487 r#"{
1488 "action": "snapshot",
1489 "arg": {
1490 "instType": "SPOT",
1491 "channel": "ticker",
1492 "instId": "BTCUSDT"
1493 },
1494 "data": []
1495 }"#,
1496 )
1497 .unwrap();
1498
1499 let result = parse_ws_ticker(&msg, None);
1500 assert!(result.is_err());
1501 }
1502
1503 #[test]
1506 fn test_parse_ws_orderbook_snapshot() {
1507 let msg = serde_json::from_str(
1508 r#"{
1509 "action": "snapshot",
1510 "arg": {
1511 "instType": "SPOT",
1512 "channel": "books5",
1513 "instId": "BTCUSDT"
1514 },
1515 "data": [{
1516 "bids": [
1517 ["50000.00", "1.5"],
1518 ["49999.00", "2.0"],
1519 ["49998.00", "0.5"]
1520 ],
1521 "asks": [
1522 ["50001.00", "1.0"],
1523 ["50002.00", "3.0"],
1524 ["50003.00", "2.5"]
1525 ],
1526 "ts": "1700000000000"
1527 }]
1528 }"#,
1529 )
1530 .unwrap();
1531
1532 let orderbook = parse_ws_orderbook(&msg, "BTC/USDT".to_string()).unwrap();
1533 assert_eq!(orderbook.symbol, "BTC/USDT");
1534 assert_eq!(orderbook.bids.len(), 3);
1535 assert_eq!(orderbook.asks.len(), 3);
1536
1537 assert_eq!(orderbook.bids[0].price, Price::new(dec!(50000.00)));
1539 assert_eq!(orderbook.bids[1].price, Price::new(dec!(49999.00)));
1540 assert_eq!(orderbook.bids[2].price, Price::new(dec!(49998.00)));
1541
1542 assert_eq!(orderbook.asks[0].price, Price::new(dec!(50001.00)));
1544 assert_eq!(orderbook.asks[1].price, Price::new(dec!(50002.00)));
1545 assert_eq!(orderbook.asks[2].price, Price::new(dec!(50003.00)));
1546 }
1547
1548 #[test]
1549 fn test_parse_ws_orderbook_update() {
1550 let msg = serde_json::from_str(
1551 r#"{
1552 "action": "update",
1553 "arg": {
1554 "instType": "SPOT",
1555 "channel": "books",
1556 "instId": "ETHUSDT"
1557 },
1558 "data": [{
1559 "bids": [
1560 ["2000.00", "10.0"]
1561 ],
1562 "asks": [
1563 ["2001.00", "5.0"]
1564 ],
1565 "ts": "1700000000001"
1566 }]
1567 }"#,
1568 )
1569 .unwrap();
1570
1571 let orderbook = parse_ws_orderbook(&msg, "ETH/USDT".to_string()).unwrap();
1572 assert_eq!(orderbook.symbol, "ETH/USDT");
1573 assert_eq!(orderbook.bids.len(), 1);
1574 assert_eq!(orderbook.asks.len(), 1);
1575 assert_eq!(orderbook.timestamp, 1700000000001);
1576 }
1577
1578 #[test]
1579 fn test_parse_ws_orderbook_missing_data() {
1580 let msg = serde_json::from_str(
1581 r#"{
1582 "action": "snapshot",
1583 "arg": {
1584 "instType": "SPOT",
1585 "channel": "books5",
1586 "instId": "BTCUSDT"
1587 }
1588 }"#,
1589 )
1590 .unwrap();
1591
1592 let result = parse_ws_orderbook(&msg, "BTC/USDT".to_string());
1593 assert!(result.is_err());
1594 }
1595
1596 #[test]
1597 fn test_parse_ws_orderbook_empty_sides() {
1598 let msg = serde_json::from_str(
1599 r#"{
1600 "action": "snapshot",
1601 "arg": {
1602 "instType": "SPOT",
1603 "channel": "books5",
1604 "instId": "BTCUSDT"
1605 },
1606 "data": [{
1607 "bids": [],
1608 "asks": [],
1609 "ts": "1700000000000"
1610 }]
1611 }"#,
1612 )
1613 .unwrap();
1614
1615 let orderbook = parse_ws_orderbook(&msg, "BTC/USDT".to_string()).unwrap();
1616 assert!(orderbook.bids.is_empty());
1617 assert!(orderbook.asks.is_empty());
1618 }
1619
1620 #[test]
1623 fn test_parse_ws_trade_single() {
1624 let msg = serde_json::from_str(
1625 r#"{
1626 "action": "snapshot",
1627 "arg": {
1628 "instType": "SPOT",
1629 "channel": "trade",
1630 "instId": "BTCUSDT"
1631 },
1632 "data": [{
1633 "tradeId": "123456789",
1634 "symbol": "BTCUSDT",
1635 "side": "buy",
1636 "price": "50000.00",
1637 "size": "0.5",
1638 "ts": "1700000000000"
1639 }]
1640 }"#,
1641 )
1642 .unwrap();
1643
1644 let trade = parse_ws_trade(&msg, None).unwrap();
1645 assert_eq!(trade.id, Some("123456789".to_string()));
1646 assert_eq!(trade.side, ccxt_core::types::OrderSide::Buy);
1647 assert_eq!(trade.price, Price::new(dec!(50000.00)));
1648 assert_eq!(
1649 trade.amount,
1650 ccxt_core::types::financial::Amount::new(dec!(0.5))
1651 );
1652 assert_eq!(trade.timestamp, 1700000000000);
1653 }
1654
1655 #[test]
1656 fn test_parse_ws_trades_multiple() {
1657 let msg = serde_json::from_str(
1658 r#"{
1659 "action": "snapshot",
1660 "arg": {
1661 "instType": "SPOT",
1662 "channel": "trade",
1663 "instId": "BTCUSDT"
1664 },
1665 "data": [
1666 {
1667 "tradeId": "123456789",
1668 "symbol": "BTCUSDT",
1669 "side": "buy",
1670 "price": "50000.00",
1671 "size": "0.5",
1672 "ts": "1700000000000"
1673 },
1674 {
1675 "tradeId": "123456790",
1676 "symbol": "BTCUSDT",
1677 "side": "sell",
1678 "price": "50001.00",
1679 "size": "1.0",
1680 "ts": "1700000000001"
1681 }
1682 ]
1683 }"#,
1684 )
1685 .unwrap();
1686
1687 let trades = parse_ws_trades(&msg, None).unwrap();
1688 assert_eq!(trades.len(), 2);
1689
1690 assert_eq!(trades[0].id, Some("123456789".to_string()));
1691 assert_eq!(trades[0].side, ccxt_core::types::OrderSide::Buy);
1692
1693 assert_eq!(trades[1].id, Some("123456790".to_string()));
1694 assert_eq!(trades[1].side, ccxt_core::types::OrderSide::Sell);
1695 }
1696
1697 #[test]
1698 fn test_parse_ws_trade_sell_side() {
1699 let msg = serde_json::from_str(
1700 r#"{
1701 "action": "snapshot",
1702 "arg": {
1703 "instType": "SPOT",
1704 "channel": "trade",
1705 "instId": "BTCUSDT"
1706 },
1707 "data": [{
1708 "tradeId": "123456789",
1709 "symbol": "BTCUSDT",
1710 "side": "sell",
1711 "price": "50000.00",
1712 "size": "0.5",
1713 "ts": "1700000000000"
1714 }]
1715 }"#,
1716 )
1717 .unwrap();
1718
1719 let trade = parse_ws_trade(&msg, None).unwrap();
1720 assert_eq!(trade.side, ccxt_core::types::OrderSide::Sell);
1721 }
1722
1723 #[test]
1724 fn test_parse_ws_trade_missing_data() {
1725 let msg = serde_json::from_str(
1726 r#"{
1727 "action": "snapshot",
1728 "arg": {
1729 "instType": "SPOT",
1730 "channel": "trade",
1731 "instId": "BTCUSDT"
1732 }
1733 }"#,
1734 )
1735 .unwrap();
1736
1737 let result = parse_ws_trade(&msg, None);
1738 assert!(result.is_err());
1739 }
1740
1741 #[test]
1742 fn test_parse_ws_trades_empty_array() {
1743 let msg = serde_json::from_str(
1744 r#"{
1745 "action": "snapshot",
1746 "arg": {
1747 "instType": "SPOT",
1748 "channel": "trade",
1749 "instId": "BTCUSDT"
1750 },
1751 "data": []
1752 }"#,
1753 )
1754 .unwrap();
1755
1756 let trades = parse_ws_trades(&msg, None).unwrap();
1757 assert!(trades.is_empty());
1758 }
1759
1760 #[test]
1763 fn test_is_ticker_message_true() {
1764 let msg = serde_json::from_str(
1765 r#"{
1766 "action": "snapshot",
1767 "arg": {
1768 "instType": "SPOT",
1769 "channel": "ticker",
1770 "instId": "BTCUSDT"
1771 },
1772 "data": [{}]
1773 }"#,
1774 )
1775 .unwrap();
1776
1777 assert!(is_ticker_message(&msg, "BTCUSDT"));
1778 }
1779
1780 #[test]
1781 fn test_is_ticker_message_wrong_symbol() {
1782 let msg = serde_json::from_str(
1783 r#"{
1784 "action": "snapshot",
1785 "arg": {
1786 "instType": "SPOT",
1787 "channel": "ticker",
1788 "instId": "ETHUSDT"
1789 },
1790 "data": [{}]
1791 }"#,
1792 )
1793 .unwrap();
1794
1795 assert!(!is_ticker_message(&msg, "BTCUSDT"));
1796 }
1797
1798 #[test]
1799 fn test_is_ticker_message_wrong_channel() {
1800 let msg = serde_json::from_str(
1801 r#"{
1802 "action": "snapshot",
1803 "arg": {
1804 "instType": "SPOT",
1805 "channel": "trade",
1806 "instId": "BTCUSDT"
1807 },
1808 "data": [{}]
1809 }"#,
1810 )
1811 .unwrap();
1812
1813 assert!(!is_ticker_message(&msg, "BTCUSDT"));
1814 }
1815
1816 #[test]
1817 fn test_is_orderbook_message_books5() {
1818 let msg = serde_json::from_str(
1819 r#"{
1820 "arg": {
1821 "instType": "SPOT",
1822 "channel": "books5",
1823 "instId": "BTCUSDT"
1824 }
1825 }"#,
1826 )
1827 .unwrap();
1828
1829 assert!(is_orderbook_message(&msg, "BTCUSDT"));
1830 }
1831
1832 #[test]
1833 fn test_is_orderbook_message_books15() {
1834 let msg = serde_json::from_str(
1835 r#"{
1836 "arg": {
1837 "instType": "SPOT",
1838 "channel": "books15",
1839 "instId": "BTCUSDT"
1840 }
1841 }"#,
1842 )
1843 .unwrap();
1844
1845 assert!(is_orderbook_message(&msg, "BTCUSDT"));
1846 }
1847
1848 #[test]
1849 fn test_is_orderbook_message_books() {
1850 let msg = serde_json::from_str(
1851 r#"{
1852 "arg": {
1853 "instType": "SPOT",
1854 "channel": "books",
1855 "instId": "BTCUSDT"
1856 }
1857 }"#,
1858 )
1859 .unwrap();
1860
1861 assert!(is_orderbook_message(&msg, "BTCUSDT"));
1862 }
1863
1864 #[test]
1865 fn test_is_trade_message_true() {
1866 let msg = serde_json::from_str(
1867 r#"{
1868 "arg": {
1869 "instType": "SPOT",
1870 "channel": "trade",
1871 "instId": "BTCUSDT"
1872 }
1873 }"#,
1874 )
1875 .unwrap();
1876
1877 assert!(is_trade_message(&msg, "BTCUSDT"));
1878 }
1879
1880 #[test]
1881 fn test_is_trade_message_wrong_channel() {
1882 let msg = serde_json::from_str(
1883 r#"{
1884 "arg": {
1885 "instType": "SPOT",
1886 "channel": "ticker",
1887 "instId": "BTCUSDT"
1888 }
1889 }"#,
1890 )
1891 .unwrap();
1892
1893 assert!(!is_trade_message(&msg, "BTCUSDT"));
1894 }
1895
1896 #[test]
1899 fn test_format_unified_symbol_usdt() {
1900 assert_eq!(format_unified_symbol("BTCUSDT"), "BTC/USDT");
1901 assert_eq!(format_unified_symbol("ETHUSDT"), "ETH/USDT");
1902 }
1903
1904 #[test]
1905 fn test_format_unified_symbol_usdc() {
1906 assert_eq!(format_unified_symbol("BTCUSDC"), "BTC/USDC");
1907 }
1908
1909 #[test]
1910 fn test_format_unified_symbol_btc() {
1911 assert_eq!(format_unified_symbol("ETHBTC"), "ETH/BTC");
1912 }
1913
1914 #[test]
1915 fn test_format_unified_symbol_unknown() {
1916 assert_eq!(format_unified_symbol("BTCXYZ"), "BTCXYZ");
1918 }
1919
1920 #[test]
1923 fn test_is_account_message_true() {
1924 let msg: Value = serde_json::from_str(
1925 r#"{
1926 "arg": {"instType": "SPOT", "channel": "account", "coin": "default"},
1927 "data": [{}]
1928 }"#,
1929 )
1930 .unwrap();
1931 assert!(is_account_message(&msg));
1932 }
1933
1934 #[test]
1935 fn test_is_account_message_wrong_channel() {
1936 let msg: Value = serde_json::from_str(
1937 r#"{
1938 "arg": {"instType": "SPOT", "channel": "orders"},
1939 "data": [{}]
1940 }"#,
1941 )
1942 .unwrap();
1943 assert!(!is_account_message(&msg));
1944 }
1945
1946 #[test]
1947 fn test_is_orders_message_true() {
1948 let msg: Value = serde_json::from_str(
1949 r#"{
1950 "arg": {"instType": "SPOT", "channel": "orders", "instId": "default"},
1951 "data": [{}]
1952 }"#,
1953 )
1954 .unwrap();
1955 assert!(is_orders_message(&msg));
1956 }
1957
1958 #[test]
1959 fn test_is_orders_message_wrong_channel() {
1960 let msg: Value = serde_json::from_str(
1961 r#"{
1962 "arg": {"instType": "SPOT", "channel": "account"},
1963 "data": [{}]
1964 }"#,
1965 )
1966 .unwrap();
1967 assert!(!is_orders_message(&msg));
1968 }
1969
1970 #[test]
1973 fn test_parse_ws_balance_single_coin() {
1974 let msg: Value = serde_json::from_str(
1975 r#"{
1976 "arg": {"instType": "SPOT", "channel": "account", "coin": "default"},
1977 "data": [{
1978 "coin": "USDT",
1979 "available": "50000.50",
1980 "frozen": "10000.25",
1981 "uTime": "1700000000000"
1982 }]
1983 }"#,
1984 )
1985 .unwrap();
1986
1987 let balance = parse_ws_balance(&msg).unwrap();
1988 assert_eq!(balance.balances.len(), 1);
1989
1990 let usdt = balance.balances.get("USDT").unwrap();
1991 assert_eq!(usdt.free, dec!(50000.50));
1992 assert_eq!(usdt.used, dec!(10000.25));
1993 assert_eq!(usdt.total, dec!(60000.75));
1994 }
1995
1996 #[test]
1997 fn test_parse_ws_balance_multiple_coins() {
1998 let msg: Value = serde_json::from_str(
1999 r#"{
2000 "arg": {"instType": "SPOT", "channel": "account"},
2001 "data": [
2002 {"coin": "USDT", "available": "50000", "frozen": "10000"},
2003 {"coin": "BTC", "available": "1.5", "frozen": "0.5"}
2004 ]
2005 }"#,
2006 )
2007 .unwrap();
2008
2009 let balance = parse_ws_balance(&msg).unwrap();
2010 assert_eq!(balance.balances.len(), 2);
2011
2012 let usdt = balance.balances.get("USDT").unwrap();
2013 assert_eq!(usdt.free, dec!(50000));
2014 assert_eq!(usdt.total, dec!(60000));
2015
2016 let btc = balance.balances.get("BTC").unwrap();
2017 assert_eq!(btc.free, dec!(1.5));
2018 assert_eq!(btc.total, dec!(2.0));
2019 }
2020
2021 #[test]
2022 fn test_parse_ws_balance_missing_data() {
2023 let msg: Value = serde_json::from_str(
2024 r#"{
2025 "arg": {"instType": "SPOT", "channel": "account"}
2026 }"#,
2027 )
2028 .unwrap();
2029
2030 let result = parse_ws_balance(&msg);
2031 assert!(result.is_err());
2032 }
2033
2034 #[test]
2037 fn test_parse_ws_order_limit_buy() {
2038 let data: Value = serde_json::from_str(
2039 r#"{
2040 "instId": "BTCUSDT",
2041 "ordId": "123456789",
2042 "clOrdId": "client123",
2043 "side": "buy",
2044 "ordType": "limit",
2045 "px": "50000",
2046 "sz": "0.1",
2047 "accFillSz": "0.05",
2048 "avgPx": "49999",
2049 "status": "partially_filled",
2050 "fee": "-0.5",
2051 "feeCcy": "USDT",
2052 "cTime": "1700000000000"
2053 }"#,
2054 )
2055 .unwrap();
2056
2057 let order = parse_ws_order(&data).unwrap();
2058 assert_eq!(order.symbol, "BTC/USDT");
2059 assert_eq!(order.id, "123456789");
2060 assert_eq!(order.client_order_id, Some("client123".to_string()));
2061 assert_eq!(order.side, OrderSide::Buy);
2062 assert_eq!(order.order_type, OrderType::Limit);
2063 assert_eq!(order.price, Some(dec!(50000)));
2064 assert_eq!(order.amount, dec!(0.1));
2065 assert_eq!(order.filled, Some(dec!(0.05)));
2066 assert_eq!(order.average, Some(dec!(49999)));
2067 assert_eq!(order.status, OrderStatus::Open); assert_eq!(order.timestamp, Some(1700000000000));
2069 assert!(order.fee.is_some());
2070 let fee = order.fee.unwrap();
2071 assert_eq!(fee.cost, dec!(0.5)); }
2073
2074 #[test]
2075 fn test_parse_ws_order_market_sell() {
2076 let data: Value = serde_json::from_str(
2077 r#"{
2078 "instId": "ETHUSDT",
2079 "ordId": "987654321",
2080 "side": "sell",
2081 "ordType": "market",
2082 "sz": "1.0",
2083 "accFillSz": "1.0",
2084 "avgPx": "2000",
2085 "status": "filled",
2086 "fee": "-2.0",
2087 "feeCcy": "USDT",
2088 "cTime": "1700000000000"
2089 }"#,
2090 )
2091 .unwrap();
2092
2093 let order = parse_ws_order(&data).unwrap();
2094 assert_eq!(order.symbol, "ETH/USDT");
2095 assert_eq!(order.side, OrderSide::Sell);
2096 assert_eq!(order.order_type, OrderType::Market);
2097 assert_eq!(order.status, OrderStatus::Closed);
2098 assert_eq!(order.cost, Some(dec!(2000))); }
2100
2101 #[test]
2102 fn test_parse_ws_order_cancelled() {
2103 let data: Value = serde_json::from_str(
2104 r#"{
2105 "instId": "BTCUSDT",
2106 "ordId": "111222333",
2107 "side": "buy",
2108 "ordType": "limit",
2109 "px": "45000",
2110 "sz": "0.5",
2111 "status": "cancelled",
2112 "cTime": "1700000000000"
2113 }"#,
2114 )
2115 .unwrap();
2116
2117 let order = parse_ws_order(&data).unwrap();
2118 assert_eq!(order.status, OrderStatus::Cancelled);
2119 }
2120
2121 #[test]
2124 fn test_parse_ws_my_trade_basic() {
2125 let data: Value = serde_json::from_str(
2126 r#"{
2127 "instId": "BTCUSDT",
2128 "ordId": "123456789",
2129 "tradeId": "trade001",
2130 "side": "buy",
2131 "fillPx": "50000",
2132 "fillSz": "0.1",
2133 "fillTime": "1700000000000",
2134 "fillFee": "-5.0",
2135 "fillFeeCcy": "USDT"
2136 }"#,
2137 )
2138 .unwrap();
2139
2140 let trade = parse_ws_my_trade(&data).unwrap();
2141 assert_eq!(trade.symbol, "BTC/USDT");
2142 assert_eq!(trade.id, Some("trade001".to_string()));
2143 assert_eq!(trade.order, Some("123456789".to_string()));
2144 assert_eq!(trade.side, OrderSide::Buy);
2145 assert_eq!(trade.price, Price::new(dec!(50000)));
2146 assert_eq!(trade.amount, Amount::new(dec!(0.1)));
2147 assert_eq!(trade.cost, Some(Cost::new(dec!(5000)))); assert_eq!(trade.timestamp, 1700000000000);
2149 assert!(trade.fee.is_some());
2150 let fee = trade.fee.unwrap();
2151 assert_eq!(fee.cost, dec!(5.0)); assert_eq!(fee.currency, "USDT");
2153 }
2154
2155 #[test]
2156 fn test_parse_ws_my_trade_sell() {
2157 let data: Value = serde_json::from_str(
2158 r#"{
2159 "instId": "ETHUSDT",
2160 "ordId": "987654321",
2161 "tradeId": "trade002",
2162 "side": "sell",
2163 "fillPx": "2000",
2164 "fillSz": "0.5",
2165 "fillTime": "1700000000001"
2166 }"#,
2167 )
2168 .unwrap();
2169
2170 let trade = parse_ws_my_trade(&data).unwrap();
2171 assert_eq!(trade.symbol, "ETH/USDT");
2172 assert_eq!(trade.side, OrderSide::Sell);
2173 assert_eq!(trade.price, Price::new(dec!(2000)));
2174 assert_eq!(trade.amount, Amount::new(dec!(0.5)));
2175 assert_eq!(trade.cost, Some(Cost::new(dec!(1000)))); assert!(trade.fee.is_none()); }
2178
2179 #[test]
2180 fn test_parse_ws_my_trade_fallback_fields() {
2181 let data: Value = serde_json::from_str(
2183 r#"{
2184 "instId": "BTCUSDT",
2185 "ordId": "111222333",
2186 "side": "buy",
2187 "price": "48000",
2188 "baseVolume": "0.2",
2189 "uTime": "1700000000002",
2190 "fee": "-1.5",
2191 "feeCcy": "USDT"
2192 }"#,
2193 )
2194 .unwrap();
2195
2196 let trade = parse_ws_my_trade(&data).unwrap();
2197 assert_eq!(trade.price, Price::new(dec!(48000)));
2198 assert_eq!(trade.amount, Amount::new(dec!(0.2)));
2199 assert_eq!(trade.timestamp, 1700000000002);
2200 assert!(trade.fee.is_some());
2201 }
2202}