1#![allow(unused_imports)]
20use anyhow::Context;
21use async_trait::async_trait;
22use derive_builder::Builder;
23use rust_decimal::prelude::*;
24use serde::{Deserialize, Serialize};
25use serde_json::Value;
26use std::{collections::BTreeMap, sync::Arc};
27
28use crate::common::{
29 errors::WebsocketError,
30 models::{ParamBuildError, WebsocketApiResponse},
31 utils::remove_empty_value,
32 websocket::{WebsocketApi, WebsocketMessageSendOptions},
33};
34use crate::spot::websocket_api::models;
35
36#[async_trait]
37pub trait GeneralApi: Send + Sync {
38 async fn exchange_info(
39 &self,
40 params: ExchangeInfoParams,
41 ) -> anyhow::Result<WebsocketApiResponse<Box<models::ExchangeInfoResponseResult>>>;
42 async fn execution_rules(
43 &self,
44 params: ExecutionRulesParams,
45 ) -> anyhow::Result<WebsocketApiResponse<Box<models::ExecutionRulesResponseResult>>>;
46 async fn ping(
47 &self,
48 params: PingParams,
49 ) -> anyhow::Result<WebsocketApiResponse<serde_json::Value>>;
50 async fn time(
51 &self,
52 params: TimeParams,
53 ) -> anyhow::Result<WebsocketApiResponse<Box<models::TimeResponseResult>>>;
54}
55
56#[derive(Clone)]
57pub struct GeneralApiClient {
58 websocket_api_base: Arc<WebsocketApi>,
59}
60
61impl GeneralApiClient {
62 pub fn new(websocket_api_base: Arc<WebsocketApi>) -> Self {
63 Self { websocket_api_base }
64 }
65}
66
67#[allow(non_camel_case_types)]
68#[derive(Debug, Clone, Serialize, Deserialize)]
69pub enum ExchangeInfoSymbolStatusEnum {
70 #[serde(rename = "TRADING")]
71 Trading,
72 #[serde(rename = "END_OF_DAY")]
73 EndOfDay,
74 #[serde(rename = "HALT")]
75 Halt,
76 #[serde(rename = "BREAK")]
77 Break,
78 #[serde(rename = "NON_REPRESENTABLE")]
79 NonRepresentable,
80}
81
82impl ExchangeInfoSymbolStatusEnum {
83 #[must_use]
84 pub fn as_str(&self) -> &'static str {
85 match self {
86 Self::Trading => "TRADING",
87 Self::EndOfDay => "END_OF_DAY",
88 Self::Halt => "HALT",
89 Self::Break => "BREAK",
90 Self::NonRepresentable => "NON_REPRESENTABLE",
91 }
92 }
93}
94
95impl std::str::FromStr for ExchangeInfoSymbolStatusEnum {
96 type Err = Box<dyn std::error::Error + Send + Sync>;
97
98 fn from_str(s: &str) -> Result<Self, Self::Err> {
99 match s {
100 "TRADING" => Ok(Self::Trading),
101 "END_OF_DAY" => Ok(Self::EndOfDay),
102 "HALT" => Ok(Self::Halt),
103 "BREAK" => Ok(Self::Break),
104 "NON_REPRESENTABLE" => Ok(Self::NonRepresentable),
105 other => Err(format!("invalid ExchangeInfoSymbolStatusEnum: {}", other).into()),
106 }
107 }
108}
109
110#[allow(non_camel_case_types)]
111#[derive(Debug, Clone, Serialize, Deserialize)]
112pub enum ExecutionRulesSymbolStatusEnum {
113 #[serde(rename = "TRADING")]
114 Trading,
115 #[serde(rename = "END_OF_DAY")]
116 EndOfDay,
117 #[serde(rename = "HALT")]
118 Halt,
119 #[serde(rename = "BREAK")]
120 Break,
121 #[serde(rename = "NON_REPRESENTABLE")]
122 NonRepresentable,
123}
124
125impl ExecutionRulesSymbolStatusEnum {
126 #[must_use]
127 pub fn as_str(&self) -> &'static str {
128 match self {
129 Self::Trading => "TRADING",
130 Self::EndOfDay => "END_OF_DAY",
131 Self::Halt => "HALT",
132 Self::Break => "BREAK",
133 Self::NonRepresentable => "NON_REPRESENTABLE",
134 }
135 }
136}
137
138impl std::str::FromStr for ExecutionRulesSymbolStatusEnum {
139 type Err = Box<dyn std::error::Error + Send + Sync>;
140
141 fn from_str(s: &str) -> Result<Self, Self::Err> {
142 match s {
143 "TRADING" => Ok(Self::Trading),
144 "END_OF_DAY" => Ok(Self::EndOfDay),
145 "HALT" => Ok(Self::Halt),
146 "BREAK" => Ok(Self::Break),
147 "NON_REPRESENTABLE" => Ok(Self::NonRepresentable),
148 other => Err(format!("invalid ExecutionRulesSymbolStatusEnum: {}", other).into()),
149 }
150 }
151}
152
153#[derive(Clone, Debug, Builder, Default)]
158#[builder(pattern = "owned", build_fn(error = "ParamBuildError"))]
159pub struct ExchangeInfoParams {
160 #[builder(setter(into), default)]
164 pub id: Option<String>,
165 #[builder(setter(into), default)]
169 pub symbol: Option<String>,
170 #[builder(setter(into), default)]
174 pub symbols: Option<Vec<String>>,
175 #[builder(setter(into), default)]
180 pub permissions: Option<Vec<String>>,
181 #[builder(setter(into), default)]
186 pub show_permission_sets: Option<bool>,
187 #[builder(setter(into), default)]
192 pub symbol_status: Option<ExchangeInfoSymbolStatusEnum>,
193}
194
195impl ExchangeInfoParams {
196 #[must_use]
199 pub fn builder() -> ExchangeInfoParamsBuilder {
200 ExchangeInfoParamsBuilder::default()
201 }
202}
203#[derive(Clone, Debug, Builder, Default)]
208#[builder(pattern = "owned", build_fn(error = "ParamBuildError"))]
209pub struct ExecutionRulesParams {
210 #[builder(setter(into), default)]
214 pub id: Option<String>,
215 #[builder(setter(into), default)]
219 pub symbol: Option<String>,
220 #[builder(setter(into), default)]
224 pub symbols: Option<Vec<String>>,
225 #[builder(setter(into), default)]
230 pub symbol_status: Option<ExecutionRulesSymbolStatusEnum>,
231}
232
233impl ExecutionRulesParams {
234 #[must_use]
237 pub fn builder() -> ExecutionRulesParamsBuilder {
238 ExecutionRulesParamsBuilder::default()
239 }
240}
241#[derive(Clone, Debug, Builder, Default)]
246#[builder(pattern = "owned", build_fn(error = "ParamBuildError"))]
247pub struct PingParams {
248 #[builder(setter(into), default)]
252 pub id: Option<String>,
253}
254
255impl PingParams {
256 #[must_use]
259 pub fn builder() -> PingParamsBuilder {
260 PingParamsBuilder::default()
261 }
262}
263#[derive(Clone, Debug, Builder, Default)]
268#[builder(pattern = "owned", build_fn(error = "ParamBuildError"))]
269pub struct TimeParams {
270 #[builder(setter(into), default)]
274 pub id: Option<String>,
275}
276
277impl TimeParams {
278 #[must_use]
281 pub fn builder() -> TimeParamsBuilder {
282 TimeParamsBuilder::default()
283 }
284}
285
286#[async_trait]
287impl GeneralApi for GeneralApiClient {
288 async fn exchange_info(
289 &self,
290 params: ExchangeInfoParams,
291 ) -> anyhow::Result<WebsocketApiResponse<Box<models::ExchangeInfoResponseResult>>> {
292 let ExchangeInfoParams {
293 id,
294 symbol,
295 symbols,
296 permissions,
297 show_permission_sets,
298 symbol_status,
299 } = params;
300
301 let mut payload: BTreeMap<String, Value> = BTreeMap::new();
302 if let Some(value) = id {
303 payload.insert("id".to_string(), serde_json::json!(value));
304 }
305 if let Some(value) = symbol {
306 payload.insert("symbol".to_string(), serde_json::json!(value));
307 }
308 if let Some(value) = symbols {
309 payload.insert("symbols".to_string(), serde_json::json!(value));
310 }
311 if let Some(value) = permissions {
312 payload.insert("permissions".to_string(), serde_json::json!(value));
313 }
314 if let Some(value) = show_permission_sets {
315 payload.insert("showPermissionSets".to_string(), serde_json::json!(value));
316 }
317 if let Some(value) = symbol_status {
318 payload.insert("symbolStatus".to_string(), serde_json::json!(value));
319 }
320 let payload = remove_empty_value(payload);
321
322 self.websocket_api_base
323 .send_message::<Box<models::ExchangeInfoResponseResult>>(
324 "/exchangeInfo".trim_start_matches('/'),
325 payload,
326 WebsocketMessageSendOptions::new(),
327 )
328 .await
329 .map_err(anyhow::Error::from)?
330 .into_iter()
331 .next()
332 .ok_or(WebsocketError::NoResponse)
333 .map_err(anyhow::Error::from)
334 }
335
336 async fn execution_rules(
337 &self,
338 params: ExecutionRulesParams,
339 ) -> anyhow::Result<WebsocketApiResponse<Box<models::ExecutionRulesResponseResult>>> {
340 let ExecutionRulesParams {
341 id,
342 symbol,
343 symbols,
344 symbol_status,
345 } = params;
346
347 let mut payload: BTreeMap<String, Value> = BTreeMap::new();
348 if let Some(value) = id {
349 payload.insert("id".to_string(), serde_json::json!(value));
350 }
351 if let Some(value) = symbol {
352 payload.insert("symbol".to_string(), serde_json::json!(value));
353 }
354 if let Some(value) = symbols {
355 payload.insert("symbols".to_string(), serde_json::json!(value));
356 }
357 if let Some(value) = symbol_status {
358 payload.insert("symbolStatus".to_string(), serde_json::json!(value));
359 }
360 let payload = remove_empty_value(payload);
361
362 self.websocket_api_base
363 .send_message::<Box<models::ExecutionRulesResponseResult>>(
364 "/executionRules".trim_start_matches('/'),
365 payload,
366 WebsocketMessageSendOptions::new(),
367 )
368 .await
369 .map_err(anyhow::Error::from)?
370 .into_iter()
371 .next()
372 .ok_or(WebsocketError::NoResponse)
373 .map_err(anyhow::Error::from)
374 }
375
376 async fn ping(
377 &self,
378 params: PingParams,
379 ) -> anyhow::Result<WebsocketApiResponse<serde_json::Value>> {
380 let PingParams { id } = params;
381
382 let mut payload: BTreeMap<String, Value> = BTreeMap::new();
383 if let Some(value) = id {
384 payload.insert("id".to_string(), serde_json::json!(value));
385 }
386 let payload = remove_empty_value(payload);
387
388 self.websocket_api_base
389 .send_message::<serde_json::Value>(
390 "/ping".trim_start_matches('/'),
391 payload,
392 WebsocketMessageSendOptions::new(),
393 )
394 .await
395 .map_err(anyhow::Error::from)?
396 .into_iter()
397 .next()
398 .ok_or(WebsocketError::NoResponse)
399 .map_err(anyhow::Error::from)
400 }
401
402 async fn time(
403 &self,
404 params: TimeParams,
405 ) -> anyhow::Result<WebsocketApiResponse<Box<models::TimeResponseResult>>> {
406 let TimeParams { id } = params;
407
408 let mut payload: BTreeMap<String, Value> = BTreeMap::new();
409 if let Some(value) = id {
410 payload.insert("id".to_string(), serde_json::json!(value));
411 }
412 let payload = remove_empty_value(payload);
413
414 self.websocket_api_base
415 .send_message::<Box<models::TimeResponseResult>>(
416 "/time".trim_start_matches('/'),
417 payload,
418 WebsocketMessageSendOptions::new(),
419 )
420 .await
421 .map_err(anyhow::Error::from)?
422 .into_iter()
423 .next()
424 .ok_or(WebsocketError::NoResponse)
425 .map_err(anyhow::Error::from)
426 }
427}
428
429#[cfg(all(test, feature = "spot"))]
430mod tests {
431 use super::*;
432 use crate::TOKIO_SHARED_RT;
433 use crate::common::websocket::{WebsocketApi, WebsocketConnection, WebsocketHandler};
434 use crate::config::ConfigurationWebsocketApi;
435 use crate::errors::WebsocketError;
436 use crate::models::WebsocketApiRateLimit;
437 use serde_json::{Value, json};
438 use tokio::spawn;
439 use tokio::sync::mpsc::{UnboundedReceiver, unbounded_channel};
440 use tokio::time::{Duration, timeout};
441 use tokio_tungstenite::tungstenite::Message;
442
443 async fn setup() -> (
444 Arc<WebsocketApi>,
445 Arc<WebsocketConnection>,
446 UnboundedReceiver<Message>,
447 ) {
448 let conn = WebsocketConnection::new("test-conn");
449 let (tx, rx) = unbounded_channel::<Message>();
450 {
451 let mut conn_state = conn.state.lock().await;
452 conn_state.ws_write_tx = Some(tx);
453 }
454
455 let config = ConfigurationWebsocketApi::builder()
456 .api_key("key")
457 .api_secret("secret")
458 .build()
459 .expect("Failed to build configuration");
460 let ws_api = WebsocketApi::new(config, vec![conn.clone()]);
461 conn.set_handler(ws_api.clone() as Arc<dyn WebsocketHandler>)
462 .await;
463 ws_api.clone().connect().await.unwrap();
464
465 (ws_api, conn, rx)
466 }
467
468 #[test]
469 fn exchange_info_success() {
470 TOKIO_SHARED_RT.block_on(async {
471 let (ws_api, conn, mut rx) = setup().await;
472 let client = GeneralApiClient::new(ws_api.clone());
473
474 let handle = spawn(async move {
475 let params = ExchangeInfoParams::builder().build().unwrap();
476 client.exchange_info(params).await
477 });
478
479 let sent = timeout(Duration::from_secs(1), rx.recv()).await.expect("send should occur").expect("channel closed");
480 let Message::Text(text) = sent else { panic!() };
481 let v: Value = serde_json::from_str(&text).unwrap();
482 let id = v["id"].as_str().unwrap();
483 assert_eq!(v["method"], "/exchangeInfo".trim_start_matches('/'));
484
485 let mut resp_json: Value = serde_json::from_str(r#"{"id":"5494febb-d167-46a2-996d-70533eb4d976","status":200,"result":{"timezone":"UTC","serverTime":1655969291181,"rateLimits":[{"rateLimitType":"REQUEST_WEIGHT","interval":"MINUTE","intervalNum":1,"limit":6000},{"rateLimitType":"ORDERS","interval":"SECOND","intervalNum":10,"limit":50},{"rateLimitType":"ORDERS","interval":"DAY","intervalNum":1,"limit":160000},{"rateLimitType":"CONNECTIONS","interval":"MINUTE","intervalNum":5,"limit":300}],"exchangeFilters":[],"symbols":[{"symbol":"BNBBTC","status":"TRADING","baseAsset":"BNB","baseAssetPrecision":8,"quoteAsset":"BTC","quotePrecision":8,"quoteAssetPrecision":8,"baseCommissionPrecision":8,"quoteCommissionPrecision":8,"orderTypes":["LIMIT LIMIT_MAKER MARKET STOP_LOSS_LIMIT TAKE_PROFIT_LIMIT"],"icebergAllowed":true,"ocoAllowed":true,"otoAllowed":true,"opoAllowed":true,"quoteOrderQtyMarketAllowed":true,"allowTrailingStop":true,"cancelReplaceAllowed":true,"amendAllowed":false,"pegInstructionsAllowed":true,"isSpotTradingAllowed":true,"isMarginTradingAllowed":true,"filters":[{"filterType":"PRICE_FILTER","minPrice":"0.00000100","maxPrice":"100000.00000000","tickSize":"0.00000100"},{"filterType":"LOT_SIZE","minQty":"0.00100000","maxQty":"100000.00000000","stepSize":"0.00100000"}],"permissions":[],"permissionSets":[["SPOT","MARGIN","TRD_GRP_004"]],"defaultSelfTradePreventionMode":"NONE","allowedSelfTradePreventionModes":["NONE"]}],"sors":[{"baseAsset":"BTC","symbols":["BTCUSDT BTCUSDC"]}]},"rateLimits":[{"rateLimitType":"REQUEST_WEIGHT","interval":"MINUTE","intervalNum":1,"limit":6000},{"rateLimitType":"ORDERS","interval":"DAY","intervalNum":1,"limit":160000},{"rateLimitType":"RAW_REQUESTS","interval":"MINUTE","intervalNum":5,"limit":61000}]}"#).unwrap();
486 resp_json["id"] = id.into();
487
488 let raw_data = resp_json.get("result").or_else(|| resp_json.get("response")).expect("no response in JSON");
489 let expected_data: Box<models::ExchangeInfoResponseResult> = serde_json::from_value(raw_data.clone()).expect("should parse raw response");
490 let empty_array = Value::Array(vec![]);
491 let raw_rate_limits = resp_json.get("rateLimits").unwrap_or(&empty_array);
492 let expected_rate_limits: Option<Vec<WebsocketApiRateLimit>> =
493 match raw_rate_limits.as_array() {
494 Some(arr) if arr.is_empty() => None,
495 Some(_) => Some(serde_json::from_value(raw_rate_limits.clone()).expect("should parse rateLimits array")),
496 None => None,
497 };
498
499 WebsocketHandler::on_message(&*ws_api, resp_json.to_string(), conn.clone()).await;
500
501 let response = timeout(Duration::from_secs(1), handle).await.expect("task done").expect("no panic").expect("no error");
502
503
504 let response_rate_limits = response.rate_limits.clone();
505 let response_data = response.data().expect("deserialize data");
506
507 assert_eq!(response_rate_limits, expected_rate_limits);
508 assert_eq!(response_data, expected_data);
509 });
510 }
511
512 #[test]
513 fn exchange_info_error_response() {
514 TOKIO_SHARED_RT.block_on(async {
515 let (ws_api, conn, mut rx) = setup().await;
516 let client = GeneralApiClient::new(ws_api.clone());
517
518 let handle = tokio::spawn(async move {
519 let params = ExchangeInfoParams::builder().build().unwrap();
520 client.exchange_info(params).await
521 });
522
523 let sent = timeout(Duration::from_secs(1), rx.recv()).await.unwrap().unwrap();
524 let Message::Text(text) = sent else { panic!() };
525 let v: Value = serde_json::from_str(&text).unwrap();
526 let id = v["id"].as_str().unwrap().to_string();
527
528 let resp_json = json!({
529 "id": id,
530 "status": 400,
531 "error": {
532 "code": -2010,
533 "msg": "Account has insufficient balance for requested action.",
534 },
535 "rateLimits": [
536 {
537 "rateLimitType": "ORDERS",
538 "interval": "SECOND",
539 "intervalNum": 10,
540 "limit": 50,
541 "count": 13
542 },
543 ],
544 });
545 WebsocketHandler::on_message(&*ws_api, resp_json.to_string(), conn.clone()).await;
546
547 let join = timeout(Duration::from_secs(1), handle).await.unwrap();
548 match join {
549 Ok(Err(e)) => {
550 let msg = e.to_string();
551 assert!(
552 msg.contains("Server‐side response error (code -2010): Account has insufficient balance for requested action."),
553 "Expected error msg to contain server error, got: {msg}"
554 );
555 }
556 Ok(Ok(_)) => panic!("Expected error"),
557 Err(_) => panic!("Task panicked"),
558 }
559 });
560 }
561
562 #[test]
563 fn exchange_info_request_timeout() {
564 TOKIO_SHARED_RT.block_on(async {
565 let (ws_api, _conn, mut rx) = setup().await;
566 let client = GeneralApiClient::new(ws_api.clone());
567
568 let handle = spawn(async move {
569 let params = ExchangeInfoParams::builder().build().unwrap();
570 client.exchange_info(params).await
571 });
572
573 let sent = timeout(Duration::from_secs(1), rx.recv())
574 .await
575 .expect("send should occur")
576 .expect("channel closed");
577 let Message::Text(text) = sent else {
578 panic!("expected Message Text")
579 };
580
581 let _: Value = serde_json::from_str(&text).unwrap();
582
583 let result = handle.await.expect("task completed");
584 match result {
585 Err(e) => {
586 if let Some(inner) = e.downcast_ref::<WebsocketError>() {
587 assert!(matches!(inner, WebsocketError::Timeout));
588 } else {
589 panic!("Unexpected error type: {:?}", e);
590 }
591 }
592 Ok(_) => panic!("Expected timeout error"),
593 }
594 });
595 }
596
597 #[test]
598 fn execution_rules_success() {
599 TOKIO_SHARED_RT.block_on(async {
600 let (ws_api, conn, mut rx) = setup().await;
601 let client = GeneralApiClient::new(ws_api.clone());
602
603 let handle = spawn(async move {
604 let params = ExecutionRulesParams::builder().build().unwrap();
605 client.execution_rules(params).await
606 });
607
608 let sent = timeout(Duration::from_secs(1), rx.recv()).await.expect("send should occur").expect("channel closed");
609 let Message::Text(text) = sent else { panic!() };
610 let v: Value = serde_json::from_str(&text).unwrap();
611 let id = v["id"].as_str().unwrap();
612 assert_eq!(v["method"], "/executionRules".trim_start_matches('/'));
613
614 let mut resp_json: Value = serde_json::from_str(r#"{"id":"5162affb-0aba-4821-b475-f2625006eb43","status":200,"result":{"symbolRules":[{"symbol":"BAZUSD","rules":[{"ruleType":"PRICE_RANGE","bidLimitMultUp":"1.0001","bidLimitMultDown":"0.9999","askLimitMultUp":"1.0001","askLimitMultDown":"0.9999"}]}]}}"#).unwrap();
615 resp_json["id"] = id.into();
616
617 let raw_data = resp_json.get("result").or_else(|| resp_json.get("response")).expect("no response in JSON");
618 let expected_data: Box<models::ExecutionRulesResponseResult> = serde_json::from_value(raw_data.clone()).expect("should parse raw response");
619 let empty_array = Value::Array(vec![]);
620 let raw_rate_limits = resp_json.get("rateLimits").unwrap_or(&empty_array);
621 let expected_rate_limits: Option<Vec<WebsocketApiRateLimit>> =
622 match raw_rate_limits.as_array() {
623 Some(arr) if arr.is_empty() => None,
624 Some(_) => Some(serde_json::from_value(raw_rate_limits.clone()).expect("should parse rateLimits array")),
625 None => None,
626 };
627
628 WebsocketHandler::on_message(&*ws_api, resp_json.to_string(), conn.clone()).await;
629
630 let response = timeout(Duration::from_secs(1), handle).await.expect("task done").expect("no panic").expect("no error");
631
632
633 let response_rate_limits = response.rate_limits.clone();
634 let response_data = response.data().expect("deserialize data");
635
636 assert_eq!(response_rate_limits, expected_rate_limits);
637 assert_eq!(response_data, expected_data);
638 });
639 }
640
641 #[test]
642 fn execution_rules_error_response() {
643 TOKIO_SHARED_RT.block_on(async {
644 let (ws_api, conn, mut rx) = setup().await;
645 let client = GeneralApiClient::new(ws_api.clone());
646
647 let handle = tokio::spawn(async move {
648 let params = ExecutionRulesParams::builder().build().unwrap();
649 client.execution_rules(params).await
650 });
651
652 let sent = timeout(Duration::from_secs(1), rx.recv()).await.unwrap().unwrap();
653 let Message::Text(text) = sent else { panic!() };
654 let v: Value = serde_json::from_str(&text).unwrap();
655 let id = v["id"].as_str().unwrap().to_string();
656
657 let resp_json = json!({
658 "id": id,
659 "status": 400,
660 "error": {
661 "code": -2010,
662 "msg": "Account has insufficient balance for requested action.",
663 },
664 "rateLimits": [
665 {
666 "rateLimitType": "ORDERS",
667 "interval": "SECOND",
668 "intervalNum": 10,
669 "limit": 50,
670 "count": 13
671 },
672 ],
673 });
674 WebsocketHandler::on_message(&*ws_api, resp_json.to_string(), conn.clone()).await;
675
676 let join = timeout(Duration::from_secs(1), handle).await.unwrap();
677 match join {
678 Ok(Err(e)) => {
679 let msg = e.to_string();
680 assert!(
681 msg.contains("Server‐side response error (code -2010): Account has insufficient balance for requested action."),
682 "Expected error msg to contain server error, got: {msg}"
683 );
684 }
685 Ok(Ok(_)) => panic!("Expected error"),
686 Err(_) => panic!("Task panicked"),
687 }
688 });
689 }
690
691 #[test]
692 fn execution_rules_request_timeout() {
693 TOKIO_SHARED_RT.block_on(async {
694 let (ws_api, _conn, mut rx) = setup().await;
695 let client = GeneralApiClient::new(ws_api.clone());
696
697 let handle = spawn(async move {
698 let params = ExecutionRulesParams::builder().build().unwrap();
699 client.execution_rules(params).await
700 });
701
702 let sent = timeout(Duration::from_secs(1), rx.recv())
703 .await
704 .expect("send should occur")
705 .expect("channel closed");
706 let Message::Text(text) = sent else {
707 panic!("expected Message Text")
708 };
709
710 let _: Value = serde_json::from_str(&text).unwrap();
711
712 let result = handle.await.expect("task completed");
713 match result {
714 Err(e) => {
715 if let Some(inner) = e.downcast_ref::<WebsocketError>() {
716 assert!(matches!(inner, WebsocketError::Timeout));
717 } else {
718 panic!("Unexpected error type: {:?}", e);
719 }
720 }
721 Ok(_) => panic!("Expected timeout error"),
722 }
723 });
724 }
725
726 #[test]
727 fn ping_success() {
728 TOKIO_SHARED_RT.block_on(async {
729 let (ws_api, conn, mut rx) = setup().await;
730 let client = GeneralApiClient::new(ws_api.clone());
731
732 let handle = spawn(async move {
733 let params = PingParams::builder().build().unwrap();
734 client.ping(params).await
735 });
736
737 let sent = timeout(Duration::from_secs(1), rx.recv()).await.expect("send should occur").expect("channel closed");
738 let Message::Text(text) = sent else { panic!() };
739 let v: Value = serde_json::from_str(&text).unwrap();
740 let id = v["id"].as_str().unwrap();
741 assert_eq!(v["method"], "/ping".trim_start_matches('/'));
742
743 let mut resp_json: Value = serde_json::from_str(r#"{"id":"922bcc6e-9de8-440d-9e84-7c80933a8d0d","status":200,"result":{},"rateLimits":[{"rateLimitType":"REQUEST_WEIGHT","interval":"MINUTE","intervalNum":1,"limit":6000,"count":1}]}"#).unwrap();
744 resp_json["id"] = id.into();
745
746 let raw_data = resp_json.get("result").or_else(|| resp_json.get("response")).expect("no response in JSON");
747 let expected_data: serde_json::Value = serde_json::from_value(raw_data.clone()).expect("should parse raw response");
748 let empty_array = Value::Array(vec![]);
749 let raw_rate_limits = resp_json.get("rateLimits").unwrap_or(&empty_array);
750 let expected_rate_limits: Option<Vec<WebsocketApiRateLimit>> =
751 match raw_rate_limits.as_array() {
752 Some(arr) if arr.is_empty() => None,
753 Some(_) => Some(serde_json::from_value(raw_rate_limits.clone()).expect("should parse rateLimits array")),
754 None => None,
755 };
756
757 WebsocketHandler::on_message(&*ws_api, resp_json.to_string(), conn.clone()).await;
758
759 let response = timeout(Duration::from_secs(1), handle).await.expect("task done").expect("no panic").expect("no error");
760
761
762 let response_rate_limits = response.rate_limits.clone();
763 let response_data = response.data().expect("deserialize data");
764
765 assert_eq!(response_rate_limits, expected_rate_limits);
766 assert_eq!(response_data, expected_data);
767 });
768 }
769
770 #[test]
771 fn ping_error_response() {
772 TOKIO_SHARED_RT.block_on(async {
773 let (ws_api, conn, mut rx) = setup().await;
774 let client = GeneralApiClient::new(ws_api.clone());
775
776 let handle = tokio::spawn(async move {
777 let params = PingParams::builder().build().unwrap();
778 client.ping(params).await
779 });
780
781 let sent = timeout(Duration::from_secs(1), rx.recv()).await.unwrap().unwrap();
782 let Message::Text(text) = sent else { panic!() };
783 let v: Value = serde_json::from_str(&text).unwrap();
784 let id = v["id"].as_str().unwrap().to_string();
785
786 let resp_json = json!({
787 "id": id,
788 "status": 400,
789 "error": {
790 "code": -2010,
791 "msg": "Account has insufficient balance for requested action.",
792 },
793 "rateLimits": [
794 {
795 "rateLimitType": "ORDERS",
796 "interval": "SECOND",
797 "intervalNum": 10,
798 "limit": 50,
799 "count": 13
800 },
801 ],
802 });
803 WebsocketHandler::on_message(&*ws_api, resp_json.to_string(), conn.clone()).await;
804
805 let join = timeout(Duration::from_secs(1), handle).await.unwrap();
806 match join {
807 Ok(Err(e)) => {
808 let msg = e.to_string();
809 assert!(
810 msg.contains("Server‐side response error (code -2010): Account has insufficient balance for requested action."),
811 "Expected error msg to contain server error, got: {msg}"
812 );
813 }
814 Ok(Ok(_)) => panic!("Expected error"),
815 Err(_) => panic!("Task panicked"),
816 }
817 });
818 }
819
820 #[test]
821 fn ping_request_timeout() {
822 TOKIO_SHARED_RT.block_on(async {
823 let (ws_api, _conn, mut rx) = setup().await;
824 let client = GeneralApiClient::new(ws_api.clone());
825
826 let handle = spawn(async move {
827 let params = PingParams::builder().build().unwrap();
828 client.ping(params).await
829 });
830
831 let sent = timeout(Duration::from_secs(1), rx.recv())
832 .await
833 .expect("send should occur")
834 .expect("channel closed");
835 let Message::Text(text) = sent else {
836 panic!("expected Message Text")
837 };
838
839 let _: Value = serde_json::from_str(&text).unwrap();
840
841 let result = handle.await.expect("task completed");
842 match result {
843 Err(e) => {
844 if let Some(inner) = e.downcast_ref::<WebsocketError>() {
845 assert!(matches!(inner, WebsocketError::Timeout));
846 } else {
847 panic!("Unexpected error type: {:?}", e);
848 }
849 }
850 Ok(_) => panic!("Expected timeout error"),
851 }
852 });
853 }
854
855 #[test]
856 fn time_success() {
857 TOKIO_SHARED_RT.block_on(async {
858 let (ws_api, conn, mut rx) = setup().await;
859 let client = GeneralApiClient::new(ws_api.clone());
860
861 let handle = spawn(async move {
862 let params = TimeParams::builder().build().unwrap();
863 client.time(params).await
864 });
865
866 let sent = timeout(Duration::from_secs(1), rx.recv()).await.expect("send should occur").expect("channel closed");
867 let Message::Text(text) = sent else { panic!() };
868 let v: Value = serde_json::from_str(&text).unwrap();
869 let id = v["id"].as_str().unwrap();
870 assert_eq!(v["method"], "/time".trim_start_matches('/'));
871
872 let mut resp_json: Value = serde_json::from_str(r#"{"id":"187d3cb2-942d-484c-8271-4e2141bbadb1","status":200,"result":{"serverTime":1656400526260},"rateLimits":[{"rateLimitType":"REQUEST_WEIGHT","interval":"MINUTE","intervalNum":1,"limit":6000,"count":1}]}"#).unwrap();
873 resp_json["id"] = id.into();
874
875 let raw_data = resp_json.get("result").or_else(|| resp_json.get("response")).expect("no response in JSON");
876 let expected_data: Box<models::TimeResponseResult> = serde_json::from_value(raw_data.clone()).expect("should parse raw response");
877 let empty_array = Value::Array(vec![]);
878 let raw_rate_limits = resp_json.get("rateLimits").unwrap_or(&empty_array);
879 let expected_rate_limits: Option<Vec<WebsocketApiRateLimit>> =
880 match raw_rate_limits.as_array() {
881 Some(arr) if arr.is_empty() => None,
882 Some(_) => Some(serde_json::from_value(raw_rate_limits.clone()).expect("should parse rateLimits array")),
883 None => None,
884 };
885
886 WebsocketHandler::on_message(&*ws_api, resp_json.to_string(), conn.clone()).await;
887
888 let response = timeout(Duration::from_secs(1), handle).await.expect("task done").expect("no panic").expect("no error");
889
890
891 let response_rate_limits = response.rate_limits.clone();
892 let response_data = response.data().expect("deserialize data");
893
894 assert_eq!(response_rate_limits, expected_rate_limits);
895 assert_eq!(response_data, expected_data);
896 });
897 }
898
899 #[test]
900 fn time_error_response() {
901 TOKIO_SHARED_RT.block_on(async {
902 let (ws_api, conn, mut rx) = setup().await;
903 let client = GeneralApiClient::new(ws_api.clone());
904
905 let handle = tokio::spawn(async move {
906 let params = TimeParams::builder().build().unwrap();
907 client.time(params).await
908 });
909
910 let sent = timeout(Duration::from_secs(1), rx.recv()).await.unwrap().unwrap();
911 let Message::Text(text) = sent else { panic!() };
912 let v: Value = serde_json::from_str(&text).unwrap();
913 let id = v["id"].as_str().unwrap().to_string();
914
915 let resp_json = json!({
916 "id": id,
917 "status": 400,
918 "error": {
919 "code": -2010,
920 "msg": "Account has insufficient balance for requested action.",
921 },
922 "rateLimits": [
923 {
924 "rateLimitType": "ORDERS",
925 "interval": "SECOND",
926 "intervalNum": 10,
927 "limit": 50,
928 "count": 13
929 },
930 ],
931 });
932 WebsocketHandler::on_message(&*ws_api, resp_json.to_string(), conn.clone()).await;
933
934 let join = timeout(Duration::from_secs(1), handle).await.unwrap();
935 match join {
936 Ok(Err(e)) => {
937 let msg = e.to_string();
938 assert!(
939 msg.contains("Server‐side response error (code -2010): Account has insufficient balance for requested action."),
940 "Expected error msg to contain server error, got: {msg}"
941 );
942 }
943 Ok(Ok(_)) => panic!("Expected error"),
944 Err(_) => panic!("Task panicked"),
945 }
946 });
947 }
948
949 #[test]
950 fn time_request_timeout() {
951 TOKIO_SHARED_RT.block_on(async {
952 let (ws_api, _conn, mut rx) = setup().await;
953 let client = GeneralApiClient::new(ws_api.clone());
954
955 let handle = spawn(async move {
956 let params = TimeParams::builder().build().unwrap();
957 client.time(params).await
958 });
959
960 let sent = timeout(Duration::from_secs(1), rx.recv())
961 .await
962 .expect("send should occur")
963 .expect("channel closed");
964 let Message::Text(text) = sent else {
965 panic!("expected Message Text")
966 };
967
968 let _: Value = serde_json::from_str(&text).unwrap();
969
970 let result = handle.await.expect("task completed");
971 match result {
972 Err(e) => {
973 if let Some(inner) = e.downcast_ref::<WebsocketError>() {
974 assert!(matches!(inner, WebsocketError::Timeout));
975 } else {
976 panic!("Unexpected error type: {:?}", e);
977 }
978 }
979 Ok(_) => panic!("Expected timeout error"),
980 }
981 });
982 }
983}