1use std::sync::Arc;
29use std::time::{Instant, SystemTime, UNIX_EPOCH};
30
31use digdigdig3::connector_manager::ExchangeHub;
32use digdigdig3::core::traits::{Credentials, MarketData, WebSocketConnector};
33use digdigdig3::core::types::{
34 AccountType, BalanceQuery, ExchangeId, PositionQuery,
35 StreamEvent, StreamType, SubscriptionRequest, Symbol, SymbolInput,
36 UserTradeFilter,
37};
38use digdigdig3::core::utils::SymbolNormalizer;
39use digdigdig3::l2::free::moex::MoexWebSocket;
40use digdigdig3::testing::harness::TestHarness;
41use futures_util::StreamExt;
42use tokio::time::{timeout, Duration};
43
44mod cli {
49 #[derive(Debug, Clone)]
50 pub struct Args {
51 pub exchange_filter: Option<String>,
52 pub run_market: bool,
53 pub run_trading: bool,
54 pub json_out: Option<String>,
55 }
56
57 impl Args {
58 pub fn parse() -> Self {
59 let argv: Vec<String> = std::env::args().collect();
60 let mut filter = None;
61 let mut market = false;
62 let mut trading = false;
63 let mut all = false;
64 let mut json_out = None;
65 let mut i = 1usize;
66 while i < argv.len() {
67 match argv[i].as_str() {
68 "--exchange" => {
69 i += 1;
70 if i < argv.len() { filter = Some(argv[i].clone()); }
71 }
72 "--market" => { market = true; }
73 "--trading" => { trading = true; }
74 "--all" => { all = true; }
75 "--json-out" => {
76 i += 1;
77 if i < argv.len() { json_out = Some(argv[i].clone()); }
78 }
79 _ => {}
80 }
81 i += 1;
82 }
83 if all { market = true; trading = true; }
84 if !market && !trading { market = true; }
86 Self { exchange_filter: filter, run_market: market, run_trading: trading, json_out }
87 }
88 }
89}
90
91mod result_types {
96 use serde::Serialize;
97
98 #[derive(Debug, Clone, Serialize)]
100 #[serde(tag = "status", content = "detail")]
101 pub enum MethodResult {
102 Ok(String),
103 Empty,
104 Err(String),
105 Timeout,
106 Unsupported(String),
107 Skipped,
108 }
109
110 impl MethodResult {
111 pub fn cell(&self) -> &'static str {
112 match self {
113 MethodResult::Ok(_) => "OK ",
114 MethodResult::Empty => "EMPT",
115 MethodResult::Err(_) => "ERR ",
116 MethodResult::Timeout => "TIME",
117 MethodResult::Unsupported(_) => "-- ",
118 MethodResult::Skipped => "SKIP",
119 }
120 }
121 pub fn is_ok(&self) -> bool { matches!(self, MethodResult::Ok(_)) }
122 pub fn is_issue(&self) -> bool { matches!(self, MethodResult::Err(_) | MethodResult::Empty | MethodResult::Timeout) }
123 pub fn detail(&self) -> Option<&str> {
124 match self {
125 MethodResult::Ok(s) | MethodResult::Err(s) | MethodResult::Unsupported(s) => Some(s.as_str()),
126 _ => None,
127 }
128 }
129 }
130
131 #[derive(Debug, Clone, Serialize)]
132 pub struct MarketRow {
133 pub exchange: String,
134 pub ping: MethodResult,
136 pub price: MethodResult,
137 pub ticker: MethodResult,
138 pub orderbook: MethodResult,
139 pub klines: MethodResult,
140 pub trades: MethodResult,
141 pub exch_info: MethodResult,
142 pub funding: MethodResult,
144 pub open_interest: MethodResult,
145 pub mark_price: MethodResult,
146 pub long_short: MethodResult,
147 pub liquidations: MethodResult,
148 pub premium_index: MethodResult,
149 pub ws_ticker: MethodResult,
151 pub ws_trade: MethodResult,
152 pub ws_orderbook: MethodResult,
153 pub ws_kline: MethodResult,
154 pub ws_mark_price: MethodResult,
155 pub ws_funding: MethodResult,
156 pub ws_liquidation: MethodResult,
157 pub ws_oi: MethodResult,
158 pub ws_agg_trade: MethodResult,
159 pub issues: Vec<String>,
161 }
162
163 #[derive(Debug, Clone, Serialize)]
164 pub struct TradingRow {
165 pub exchange: String,
166 pub balance: MethodResult,
167 pub account_info: MethodResult,
168 pub open_orders: MethodResult,
169 pub user_trades: MethodResult,
170 pub positions: MethodResult,
171 pub fees: MethodResult,
172 pub issues: Vec<String>,
173 }
174
175 #[derive(Debug, Clone, Serialize)]
176 pub struct ExchangeReport {
177 pub market: Option<MarketRow>,
178 pub trading: Option<TradingRow>,
179 }
180}
181
182use result_types::{ExchangeReport, MarketRow, MethodResult, TradingRow};
183
184fn now_ms() -> i64 {
189 SystemTime::now()
190 .duration_since(UNIX_EPOCH)
191 .map(|d| d.as_millis() as i64)
192 .unwrap_or(0)
193}
194
195fn stale_threshold_ms() -> i64 { now_ms() - 5 * 60_000 }
196
197fn timestamp_unit_bug(ts: i64) -> bool {
198 let now = now_ms();
199 ts > 0 && ts < now / 100
200}
201
202fn timestamp_future_bug(ts: i64) -> bool { ts > now_ms() + 60_000 }
203
204fn truncate(s: &str, n: usize) -> String {
205 match s.char_indices().nth(n) {
206 Some((i, _)) => format!("{}…", &s[..i]),
207 None => s.to_string(),
208 }
209}
210
211mod market {
216 use super::*;
217
218 #[derive(Debug, Clone, Copy, PartialEq, Eq)]
219 pub enum ExpectedKind { Ticker, Trade, Orderbook, Kline, MarkPrice, FundingRate, Liquidation, OpenInterest, AggTrade }
220
221 pub fn inspect_event(event: &StreamEvent, stale_ms: i64, expected_kind: ExpectedKind) -> (String, bool, Vec<String>) {
223 let mut issues: Vec<String> = Vec::new();
224
225 let (s, valid) = match event {
226 StreamEvent::Ticker { symbol, ticker: t } => {
227 if expected_kind != ExpectedKind::Ticker {
228 issues.push(format!("WRONG_TYPE: got Ticker, expected {:?}", expected_kind));
229 }
230 if t.last_price <= 0.0 { issues.push("last_price<=0".into()); }
231 if timestamp_unit_bug(t.timestamp) {
232 issues.push(format!("ts_unit_bug(seconds): {}", t.timestamp));
233 } else if timestamp_future_bug(t.timestamp) {
234 issues.push(format!("ts_future_bug: {}", t.timestamp));
235 } else if t.timestamp <= stale_ms {
236 issues.push(format!("ts_stale: {}min ago", (now_ms() - t.timestamp) / 60_000));
237 }
238 match (t.bid_price, t.ask_price) {
239 (Some(b), Some(a)) if b > a => issues.push(format!("bid>ask: {:.4}>{:.4}", b, a)),
240 _ => {}
243 }
244 let valid = t.last_price > 0.0
245 && !timestamp_unit_bug(t.timestamp)
246 && !timestamp_future_bug(t.timestamp)
247 && t.timestamp > stale_ms;
248 (format!("Ticker sym={} last={:.4} bid={} ask={} ts={}",
249 symbol, t.last_price,
250 t.bid_price.map(|v| format!("{:.4}", v)).unwrap_or_else(|| "None".into()),
251 t.ask_price.map(|v| format!("{:.4}", v)).unwrap_or_else(|| "None".into()),
252 t.timestamp), valid)
253 }
254 StreamEvent::Trade { symbol, trade: t } => {
255 if expected_kind != ExpectedKind::Trade {
256 issues.push(format!("WRONG_TYPE: got Trade, expected {:?}", expected_kind));
257 }
258 let valid = t.price > 0.0 && t.quantity > 0.0;
259 if !valid { issues.push("price<=0 or qty<=0".into()); }
260 (format!("Trade sym={} px={:.4} qty={:.6} ts={}", symbol, t.price, t.quantity, t.timestamp), valid)
261 }
262 StreamEvent::OrderbookSnapshot { symbol, book: ob } => {
263 let top_bid = ob.bids.first().map(|l| l.price).unwrap_or(0.0);
264 let top_ask = ob.asks.first().map(|l| l.price).unwrap_or(0.0);
265 let truly_empty = (ob.bids.is_empty() && ob.asks.is_empty())
268 || (top_bid <= 0.0 && top_ask <= 0.0);
269 if truly_empty { issues.push("orderbook empty/zero".into()); }
270 let valid = !truly_empty;
271 (format!("OBSnapshot sym={} bids={} asks={} top_bid={:.4} top_ask={:.4}",
272 symbol, ob.bids.len(), ob.asks.len(), top_bid, top_ask), valid)
273 }
274 StreamEvent::OrderbookDelta { symbol, delta: od } => {
275 let has_data = !od.bids.is_empty() || !od.asks.is_empty();
277 let top_bid = od.bids.first().map(|l| l.price).unwrap_or(0.0);
278 (format!("OBDelta sym={} bids={} asks={} top_bid={:.4} ts={}",
279 symbol, od.bids.len(), od.asks.len(), top_bid, od.timestamp), has_data)
280 }
281 StreamEvent::Kline { symbol, interval, kline: k } => {
282 let valid = k.close > 0.0 && k.open > 0.0 && k.open_time > 0;
283 if !valid { issues.push("kline o/c<=0 or no open_time".into()); }
284 (format!("Kline sym={} iv={} o={:.4} h={:.4} l={:.4} c={:.4} vol={:.2} ts={}",
285 symbol, interval, k.open, k.high, k.low, k.close, k.volume, k.open_time), valid)
286 }
287 StreamEvent::MarkPrice { symbol, mark_price, timestamp, .. } => {
288 if expected_kind != ExpectedKind::MarkPrice {
289 issues.push(format!("WRONG_TYPE: got MarkPrice, expected {:?}", expected_kind));
290 }
291 let valid = *mark_price > 0.0 && *timestamp > stale_ms;
292 if !valid { issues.push("mark_price<=0 or stale".into()); }
293 (format!("MarkPrice sym={} px={:.4} ts={}", symbol, mark_price, timestamp), valid)
294 }
295 StreamEvent::FundingRate { symbol, rate, timestamp, .. } => {
296 if expected_kind == ExpectedKind::Ticker {
297 issues.push("WRONG_TYPE: got FundingRate while subscribed to Ticker".into());
298 }
299 (format!("FundingRate sym={} rate={:.6} ts={}", symbol, rate, timestamp), *timestamp > 0)
300 }
301 StreamEvent::Liquidation { symbol, price, quantity, timestamp, .. } => {
302 let valid = *price > 0.0 && *quantity > 0.0;
303 if !valid { issues.push("liquidation px/qty<=0".into()); }
304 (format!("Liquidation sym={} px={:.4} qty={:.6} ts={}", symbol, price, quantity, timestamp), valid)
305 }
306 StreamEvent::OpenInterestUpdate { symbol, open_interest, timestamp, .. } => {
307 let valid = *open_interest > 0.0;
308 if !valid { issues.push("open_interest<=0".into()); }
309 (format!("OI sym={} oi={:.2} ts={}", symbol, open_interest, timestamp), valid)
310 }
311 StreamEvent::AggTrade { symbol, price, quantity, timestamp, .. } => {
312 let valid = *price > 0.0 && *quantity > 0.0;
313 if !valid { issues.push("aggtrade px/qty<=0".into()); }
314 (format!("AggTrade sym={} px={:.4} qty={:.6} ts={}", symbol, price, quantity, timestamp), valid)
315 }
316 other => {
317 let s = format!("{:?}", other);
318 let short = truncate(&s, 80);
319 (short, true)
320 }
321 };
322 (s, valid, issues)
323 }
324
325 pub async fn collect_ws_stream(
328 ws: Arc<dyn WebSocketConnector>,
329 sub: SubscriptionRequest,
330 expected_kind: ExpectedKind,
331 stale_ms: i64,
332 budget_secs: u64,
333 ) -> MethodResult {
334 let account_type = sub.account_type;
335 match timeout(Duration::from_secs(8), ws.connect(account_type)).await {
336 Ok(Ok(())) => {}
337 Ok(Err(e)) => return MethodResult::Err(truncate(&e.to_string(), 60)),
338 Err(_) => return MethodResult::Err("connect_timeout".into()),
339 }
340 match timeout(Duration::from_secs(5), ws.subscribe(sub)).await {
341 Ok(Ok(())) => {}
342 Ok(Err(e)) => {
343 let msg = e.to_string();
344 if msg.contains("UnsupportedOperation") || msg.contains("not support")
345 || msg.contains("Not supported")
346 {
347 return MethodResult::Unsupported(truncate(&msg, 60));
348 }
349 return MethodResult::Err(format!("sub_fail: {}", truncate(&msg, 60)));
350 }
351 Err(_) => return MethodResult::Err("subscribe_timeout".into()),
352 }
353
354 let mut stream = ws.event_stream();
355 let mut event_count = 0u32;
356 let mut first_desc: Option<String> = None;
357 let mut all_issues: Vec<String> = Vec::new();
358 let mut wrong_type: Vec<String> = Vec::new();
359 let mut saw_expected = false;
360 let collect_start = Instant::now();
361 let budget = Duration::from_secs(budget_secs);
362
363 loop {
364 let remaining = budget.saturating_sub(collect_start.elapsed());
365 if remaining.is_zero() { break; }
366 match timeout(remaining, stream.next()).await {
367 Ok(Some(Ok(event))) => {
368 event_count += 1;
369 let is_expected = matches!((&event, expected_kind),
370 (StreamEvent::Ticker { .. }, ExpectedKind::Ticker) |
371 (StreamEvent::Trade { .. }, ExpectedKind::Trade) |
372 (StreamEvent::OrderbookSnapshot { .. } | StreamEvent::OrderbookDelta { .. }, ExpectedKind::Orderbook) |
373 (StreamEvent::Kline { .. }, ExpectedKind::Kline) |
374 (StreamEvent::MarkPrice { .. }, ExpectedKind::MarkPrice) |
375 (StreamEvent::FundingRate { .. }, ExpectedKind::FundingRate) |
376 (StreamEvent::Liquidation { .. }, ExpectedKind::Liquidation) |
377 (StreamEvent::OpenInterestUpdate { .. }, ExpectedKind::OpenInterest) |
378 (StreamEvent::AggTrade { .. }, ExpectedKind::AggTrade)
379 );
380 if is_expected { saw_expected = true; }
381 let (desc, _valid, issues) = inspect_event(&event, stale_ms, expected_kind);
382 if first_desc.is_none() { first_desc = Some(desc); }
383 for iss in issues {
384 if iss.starts_with("WRONG_TYPE") {
385 if !wrong_type.contains(&iss) { wrong_type.push(iss); }
386 } else if !all_issues.contains(&iss) {
387 all_issues.push(iss);
388 }
389 }
390 }
391 Ok(Some(Err(_))) | Ok(None) | Err(_) => break,
392 }
393 }
394
395 if !saw_expected { all_issues.extend(wrong_type); }
396
397 if event_count == 0 {
398 MethodResult::Err("silent_0_events".into())
399 } else {
400 let desc = first_desc.unwrap_or_else(|| "?".into());
401 if all_issues.is_empty() {
402 MethodResult::Ok(format!("cnt={} {}", event_count, truncate(&desc, 80)))
403 } else {
404 MethodResult::Err(format!("cnt={} ISSUES[{}] {}", event_count, all_issues.join(";"), truncate(&desc, 60)))
405 }
406 }
407 }
408
409 pub async fn run_ws_sub(
413 exchange: ExchangeId,
414 account_type: AccountType,
415 stream_type: StreamType,
416 symbol: Symbol,
417 expected_kind: ExpectedKind,
418 stale_ms: i64,
419 budget_secs: u64,
420 ) -> MethodResult {
421 let hub = ExchangeHub::new();
422 match timeout(Duration::from_secs(8), hub.connect_websocket(exchange, account_type, false)).await {
423 Ok(Ok(())) => {}
424 Ok(Err(e)) => {
425 let msg = e.to_string();
426 if msg.contains("UnsupportedOperation") || msg.contains("not support")
427 || msg.contains("Not supported")
428 {
429 return MethodResult::Unsupported(truncate(&msg, 60));
430 }
431 return MethodResult::Err(format!("connect_fail: {}", truncate(&msg, 60)));
432 }
433 Err(_) => return MethodResult::Err("ws_connect_timeout".into()),
434 }
435 match hub.ws(exchange, account_type) {
436 Some(ws) => {
437 let sub = SubscriptionRequest {
438 symbol,
439 stream_type,
440 account_type,
441 depth: None,
442 update_speed_ms: None,
443 };
444 collect_ws_stream(ws, sub, expected_kind, stale_ms, budget_secs).await
445 }
446 None => MethodResult::Err("ws_none_after_connect".into()),
447 }
448 }
449}
450
451fn raw_symbol_for(id: ExchangeId) -> (Symbol, String, AccountType) {
456 let btc_usdt = Symbol::new("BTC", "USDT");
457 let btc_usd = Symbol::new("BTC", "USD");
458
459 let make = |sym: Symbol, at: AccountType| -> (Symbol, String, AccountType) {
460 let raw = SymbolNormalizer::to_exchange(id, &sym, at)
461 .unwrap_or_else(|_| sym.to_concat());
462 let sym_with_raw = Symbol::with_raw(&sym.base, &sym.quote, raw.clone());
463 (sym_with_raw, raw, at)
464 };
465
466 match id {
467 ExchangeId::Deribit => make(btc_usd, AccountType::FuturesCross),
468 ExchangeId::HyperLiquid => make(btc_usd, AccountType::FuturesCross),
469 ExchangeId::Upbit => {
470 let btc_krw = Symbol::new("BTC", "KRW");
471 let raw = SymbolNormalizer::to_exchange(id, &btc_krw, AccountType::Spot)
472 .unwrap_or_else(|_| "KRW-BTC".to_string());
473 let sym_with_raw = Symbol::with_raw("BTC", "KRW", raw.clone());
474 (sym_with_raw, raw, AccountType::Spot)
475 }
476 ExchangeId::Bitfinex => make(btc_usd, AccountType::Spot),
477 ExchangeId::Gemini => make(btc_usd, AccountType::Spot),
478 ExchangeId::Bitstamp => make(btc_usd, AccountType::Spot),
479 ExchangeId::Kraken => make(btc_usd, AccountType::Spot),
480 ExchangeId::Coinbase => make(btc_usd, AccountType::Spot),
481 ExchangeId::KuCoin => make(btc_usdt, AccountType::Spot),
482 ExchangeId::OKX => make(btc_usdt, AccountType::Spot),
483 ExchangeId::GateIO => make(btc_usdt, AccountType::Spot),
484 ExchangeId::BingX => make(btc_usdt, AccountType::Spot),
485 ExchangeId::CryptoCom => make(btc_usdt, AccountType::Spot),
486 ExchangeId::Dydx => make(btc_usd, AccountType::FuturesCross),
487 ExchangeId::YahooFinance => {
488 let btc = Symbol::new("BTC", "USD");
489 let raw = SymbolNormalizer::to_exchange(id, &btc, AccountType::Spot)
490 .unwrap_or_else(|_| "BTC-USD".to_string());
491 (Symbol::with_raw("BTC", "USD", raw.clone()), raw, AccountType::Spot)
492 }
493 ExchangeId::Polymarket => {
494 let sym = Symbol::with_raw("DISCOVER", "USDC", "DISCOVER".to_string());
495 (sym, "DISCOVER".to_string(), AccountType::Spot)
496 }
497 _ => make(btc_usdt, AccountType::Spot),
498 }
499}
500
501fn liq_symbol_for(id: ExchangeId) -> Symbol {
507 match id {
508 ExchangeId::Binance => Symbol::with_raw("", "", "".to_string()),
509 ExchangeId::GateIO => Symbol::with_raw("", "", "!all".to_string()),
510 _ => {
511 let (sym, ..) = raw_symbol_for(id);
513 sym
514 }
515 }
516}
517
518fn no_bid_ask_by_design(id: ExchangeId) -> bool {
525 matches!(id,
526 ExchangeId::CryptoCompare
527 | ExchangeId::YahooFinance
528 | ExchangeId::Twelvedata
529 | ExchangeId::AlphaVantage
530 | ExchangeId::Tiingo
531 | ExchangeId::Fred
532 | ExchangeId::DefiLlama
533 | ExchangeId::Coinglass
534 | ExchangeId::Dukascopy
535 | ExchangeId::Moex
536 | ExchangeId::Krx
537 | ExchangeId::JQuants
538 | ExchangeId::Bls
539 )
540}
541
542fn is_futures(id: ExchangeId, at: AccountType) -> bool {
544 matches!(at, AccountType::FuturesCross | AccountType::FuturesIsolated)
545 || matches!(id, ExchangeId::Binance | ExchangeId::Bybit | ExchangeId::OKX
546 | ExchangeId::KuCoin | ExchangeId::GateIO | ExchangeId::MEXC
547 | ExchangeId::HTX | ExchangeId::Bitget | ExchangeId::BingX
548 | ExchangeId::CryptoCom | ExchangeId::Deribit | ExchangeId::HyperLiquid
549 | ExchangeId::Lighter | ExchangeId::Dydx | ExchangeId::Coinglass)
550}
551
552async fn test_market(id: ExchangeId) -> MarketRow {
557 let (sym, raw_str, account_type) = raw_symbol_for(id);
558 let stale_ms = stale_threshold_ms();
559 let futures_capable = is_futures(id, account_type);
560
561 let hub = ExchangeHub::new();
563 let connected = match timeout(Duration::from_secs(12), hub.connect_public(id, false)).await {
564 Ok(Ok(())) => true,
565 Ok(Err(e)) => {
566 let err_msg = truncate(&e.to_string(), 70);
567 return MarketRow {
568 exchange: format!("{:?}", id),
569 ping: MethodResult::Err(format!("connect_fail: {}", err_msg)),
570 price: MethodResult::Skipped,
571 ticker: MethodResult::Skipped,
572 orderbook: MethodResult::Skipped,
573 klines: MethodResult::Skipped,
574 trades: MethodResult::Skipped,
575 exch_info: MethodResult::Skipped,
576 funding: MethodResult::Skipped,
577 open_interest: MethodResult::Skipped,
578 mark_price: MethodResult::Skipped,
579 long_short: MethodResult::Skipped,
580 liquidations: MethodResult::Skipped,
581 premium_index: MethodResult::Skipped,
582 ws_ticker: MethodResult::Skipped,
583 ws_trade: MethodResult::Skipped,
584 ws_orderbook: MethodResult::Skipped,
585 ws_kline: MethodResult::Skipped,
586 ws_mark_price: MethodResult::Skipped,
587 ws_funding: MethodResult::Skipped,
588 ws_liquidation: MethodResult::Skipped,
589 ws_oi: MethodResult::Skipped,
590 ws_agg_trade: MethodResult::Skipped,
591 issues: vec![format!("connect_fail: {}", err_msg)],
592 };
593 }
594 Err(_) => {
595 return MarketRow {
596 exchange: format!("{:?}", id),
597 ping: MethodResult::Err("connect_timeout".into()),
598 price: MethodResult::Skipped, ticker: MethodResult::Skipped,
599 orderbook: MethodResult::Skipped, klines: MethodResult::Skipped,
600 trades: MethodResult::Skipped, exch_info: MethodResult::Skipped,
601 funding: MethodResult::Skipped, open_interest: MethodResult::Skipped,
602 mark_price: MethodResult::Skipped, long_short: MethodResult::Skipped,
603 liquidations: MethodResult::Skipped, premium_index: MethodResult::Skipped,
604 ws_ticker: MethodResult::Skipped, ws_trade: MethodResult::Skipped,
605 ws_orderbook: MethodResult::Skipped, ws_kline: MethodResult::Skipped,
606 ws_mark_price: MethodResult::Skipped, ws_funding: MethodResult::Skipped,
607 ws_liquidation: MethodResult::Skipped, ws_oi: MethodResult::Skipped,
608 ws_agg_trade: MethodResult::Skipped,
609 issues: vec!["connect_timeout".into()],
610 };
611 }
612 };
613 let _ = connected;
614
615 let conn = match hub.rest(id) {
616 Some(c) => c,
617 None => {
618 return MarketRow {
619 exchange: format!("{:?}", id),
620 ping: MethodResult::Err("no_rest_handle".into()),
621 price: MethodResult::Skipped, ticker: MethodResult::Skipped,
622 orderbook: MethodResult::Skipped, klines: MethodResult::Skipped,
623 trades: MethodResult::Skipped, exch_info: MethodResult::Skipped,
624 funding: MethodResult::Skipped, open_interest: MethodResult::Skipped,
625 mark_price: MethodResult::Skipped, long_short: MethodResult::Skipped,
626 liquidations: MethodResult::Skipped, premium_index: MethodResult::Skipped,
627 ws_ticker: MethodResult::Skipped, ws_trade: MethodResult::Skipped,
628 ws_orderbook: MethodResult::Skipped, ws_kline: MethodResult::Skipped,
629 ws_mark_price: MethodResult::Skipped, ws_funding: MethodResult::Skipped,
630 ws_liquidation: MethodResult::Skipped, ws_oi: MethodResult::Skipped,
631 ws_agg_trade: MethodResult::Skipped,
632 issues: vec!["no_rest_handle".into()],
633 };
634 }
635 };
636
637 let caps = hub.capabilities(id).unwrap_or_default();
638 let sym_input_str = raw_str.clone();
639
640 let ping = {
644 let conn = conn.clone();
645 match timeout(Duration::from_secs(10), conn.ping()).await {
646 Ok(Ok(())) => MethodResult::Ok("pong".into()),
647 Ok(Err(e)) => {
648 let msg = e.to_string();
649 if msg.contains("UnsupportedOperation") || msg.contains("Not supported:") { MethodResult::Unsupported(truncate(&msg, 50)) }
650 else { MethodResult::Err(truncate(&msg, 60)) }
651 }
652 Err(_) => MethodResult::Timeout,
653 }
654 };
655
656 let price = if !caps.has_ticker {
658 MethodResult::Skipped
659 } else {
660 let conn = conn.clone();
661 let sym_str = sym_input_str.clone();
662 match timeout(Duration::from_secs(10),
663 conn.get_price(sym_str.as_str().into(), account_type)).await {
664 Ok(Ok(p)) if p > 0.0 => MethodResult::Ok(format!("price={:.4}", p)),
665 Ok(Ok(_)) => MethodResult::Empty,
666 Ok(Err(e)) => {
667 let msg = e.to_string();
668 if msg.contains("UnsupportedOperation") || msg.contains("Not supported:") { MethodResult::Unsupported(truncate(&msg, 50)) }
669 else { MethodResult::Err(truncate(&msg, 60)) }
670 }
671 Err(_) => MethodResult::Timeout,
672 }
673 };
674
675 let ticker = if !caps.has_ticker {
677 MethodResult::Skipped
678 } else {
679 let conn = conn.clone();
680 let sym_str = sym_input_str.clone();
681 match timeout(Duration::from_secs(10),
682 MarketData::get_ticker(&*conn, sym_str.as_str().into(), account_type)).await {
683 Ok(Ok(t)) => {
684 let mut issues: Vec<String> = Vec::new();
685 if t.last_price <= 0.0 { issues.push("last=0".into()); }
686 if timestamp_unit_bug(t.timestamp) { issues.push(format!("ts_unit_bug:{}", t.timestamp)); }
687 else if timestamp_future_bug(t.timestamp) { issues.push(format!("ts_future_bug:{}", t.timestamp)); }
688 else if t.timestamp == 0 { issues.push("ts_missing".into()); }
689 match (t.bid_price, t.ask_price) {
690 (Some(b), Some(a)) if b > a => issues.push(format!("bid>ask")),
691 (None, None) if !no_bid_ask_by_design(id) => issues.push("bid/ask None".into()),
692 _ => {}
693 }
694 let desc = format!("last={:.4} bid={} ask={} ts={}",
695 t.last_price,
696 t.bid_price.map(|v| format!("{:.4}", v)).unwrap_or_else(|| "None".into()),
697 t.ask_price.map(|v| format!("{:.4}", v)).unwrap_or_else(|| "None".into()),
698 if t.timestamp == 0 { "MISSING".to_string() } else { format!("{}s_ago", (now_ms() - t.timestamp) / 1000) });
699 if issues.is_empty() && t.last_price > 0.0 {
700 MethodResult::Ok(desc)
701 } else if t.last_price > 0.0 {
702 MethodResult::Err(format!("{} ISSUES:{}", desc, issues.join(",")))
703 } else {
704 MethodResult::Empty
705 }
706 }
707 Ok(Err(e)) => {
708 let msg = e.to_string();
709 if msg.contains("UnsupportedOperation") || msg.contains("Not supported:") { MethodResult::Unsupported(truncate(&msg, 50)) }
710 else { MethodResult::Err(truncate(&msg, 60)) }
711 }
712 Err(_) => MethodResult::Timeout,
713 }
714 };
715
716 let orderbook = if !caps.has_orderbook {
718 MethodResult::Skipped
719 } else {
720 let conn = conn.clone();
721 let sym_str = sym_input_str.clone();
722 match timeout(Duration::from_secs(10),
723 conn.get_orderbook(sym_str.as_str().into(), Some(10), account_type)).await {
724 Ok(Ok(ob)) => {
725 if ob.bids.is_empty() || ob.asks.is_empty() {
726 MethodResult::Empty
727 } else {
728 let top_bid = ob.bids.first().map(|l| l.price).unwrap_or(0.0);
729 let top_ask = ob.asks.first().map(|l| l.price).unwrap_or(0.0);
730 if top_bid >= top_ask && top_ask > 0.0 {
731 MethodResult::Err(format!("bid={:.4}>=ask={:.4}", top_bid, top_ask))
732 } else {
733 MethodResult::Ok(format!("bids={} asks={} top_bid={:.4} top_ask={:.4}",
734 ob.bids.len(), ob.asks.len(), top_bid, top_ask))
735 }
736 }
737 }
738 Ok(Err(e)) => {
739 let msg = e.to_string();
740 if msg.contains("UnsupportedOperation") || msg.contains("Not supported:") { MethodResult::Unsupported(truncate(&msg, 50)) }
741 else { MethodResult::Err(truncate(&msg, 60)) }
742 }
743 Err(_) => MethodResult::Timeout,
744 }
745 };
746
747 let klines = if !caps.has_klines {
749 MethodResult::Skipped
750 } else {
751 let conn = conn.clone();
752 let sym_str = sym_input_str.clone();
753 match timeout(Duration::from_secs(12),
754 conn.get_klines(sym_str.as_str().into(), "1m", Some(5), account_type, None)).await {
755 Ok(Ok(ks)) if ks.is_empty() => MethodResult::Empty,
756 Ok(Ok(ks)) => {
757 let last = ks.last().unwrap();
758 if last.close <= 0.0 {
759 MethodResult::Err(format!("close={}", last.close))
760 } else {
761 MethodResult::Ok(format!("len={} last_close={:.4}", ks.len(), last.close))
762 }
763 }
764 Ok(Err(e)) => {
765 let msg = e.to_string();
766 if msg.contains("UnsupportedOperation") || msg.contains("Not supported:") { MethodResult::Unsupported(truncate(&msg, 50)) }
767 else { MethodResult::Err(truncate(&msg, 60)) }
768 }
769 Err(_) => MethodResult::Timeout,
770 }
771 };
772
773 let trades = if !caps.has_recent_trades {
775 MethodResult::Skipped
776 } else {
777 let conn = conn.clone();
778 let sym_str = sym_input_str.clone();
779 match timeout(Duration::from_secs(10),
780 conn.get_recent_trades(sym_str.as_str().into(), Some(10), account_type)).await {
781 Ok(Ok(ts)) if ts.is_empty() => MethodResult::Empty,
782 Ok(Ok(ts)) => {
783 let first = ts.first().unwrap();
784 if first.price <= 0.0 {
785 MethodResult::Err(format!("price={}", first.price))
786 } else {
787 MethodResult::Ok(format!("len={} first_px={:.4}", ts.len(), first.price))
788 }
789 }
790 Ok(Err(e)) => {
791 let msg = e.to_string();
792 if msg.contains("UnsupportedOperation") || msg.contains("Not supported:") { MethodResult::Unsupported(truncate(&msg, 50)) }
793 else { MethodResult::Err(truncate(&msg, 60)) }
794 }
795 Err(_) => MethodResult::Timeout,
796 }
797 };
798
799 let exch_info = if !caps.has_exchange_info {
801 MethodResult::Skipped
802 } else {
803 let conn = conn.clone();
804 match timeout(Duration::from_secs(15),
805 conn.get_exchange_info(account_type)).await {
806 Ok(Ok(infos)) if infos.is_empty() => MethodResult::Empty,
807 Ok(Ok(infos)) => MethodResult::Ok(format!("symbols={}", infos.len())),
808 Ok(Err(e)) => {
809 let msg = e.to_string();
810 if msg.contains("UnsupportedOperation") || msg.contains("Not supported:") { MethodResult::Unsupported(truncate(&msg, 50)) }
811 else { MethodResult::Err(truncate(&msg, 60)) }
812 }
813 Err(_) => MethodResult::Timeout,
814 }
815 };
816
817 let fut_sym = raw_str.clone();
820 let futures_at = if matches!(account_type, AccountType::FuturesCross | AccountType::FuturesIsolated) {
821 account_type
822 } else {
823 AccountType::FuturesCross
824 };
825
826 let funding = if !futures_capable || !caps.has_funding_payments {
828 MethodResult::Skipped
829 } else {
830 let conn = conn.clone();
831 let s = fut_sym.clone();
832 match timeout(Duration::from_secs(10),
833 conn.get_funding_rate(&s, futures_at)).await {
834 Ok(Ok(fr)) => MethodResult::Ok(format!("rate={:.6} next={:?}", fr.rate, fr.next_funding_time)),
835 Ok(Err(e)) => {
836 let msg = e.to_string();
837 if msg.contains("UnsupportedOperation") || msg.contains("Not supported:") { MethodResult::Unsupported(truncate(&msg, 50)) }
838 else { MethodResult::Err(truncate(&msg, 60)) }
839 }
840 Err(_) => MethodResult::Timeout,
841 }
842 };
843
844 let open_interest = if !futures_capable {
846 MethodResult::Skipped
847 } else {
848 let conn = conn.clone();
849 let s = fut_sym.clone();
850 match timeout(Duration::from_secs(10),
851 conn.get_open_interest(&s, futures_at)).await {
852 Ok(Ok(oi)) if oi.open_interest <= 0.0 => MethodResult::Empty,
853 Ok(Ok(oi)) => MethodResult::Ok(format!("oi={:.2}", oi.open_interest)),
854 Ok(Err(e)) => {
855 let msg = e.to_string();
856 if msg.contains("UnsupportedOperation") || msg.contains("Not supported:") { MethodResult::Unsupported(truncate(&msg, 50)) }
857 else { MethodResult::Err(truncate(&msg, 60)) }
858 }
859 Err(_) => MethodResult::Timeout,
860 }
861 };
862
863 let mark_price = if !futures_capable || !caps.has_mark_price {
865 MethodResult::Skipped
866 } else {
867 let conn = conn.clone();
868 let s = fut_sym.clone();
869 match timeout(Duration::from_secs(10),
870 conn.get_mark_price(&s)).await {
871 Ok(Ok(mp)) if mp.mark_price <= 0.0 => MethodResult::Empty,
872 Ok(Ok(mp)) => MethodResult::Ok(format!("mark={:.4}", mp.mark_price)),
873 Ok(Err(e)) => {
874 let msg = e.to_string();
875 if msg.contains("UnsupportedOperation") || msg.contains("Not supported:") { MethodResult::Unsupported(truncate(&msg, 50)) }
876 else { MethodResult::Err(truncate(&msg, 60)) }
877 }
878 Err(_) => MethodResult::Timeout,
879 }
880 };
881
882 let long_short = if !futures_capable || !caps.has_long_short_ratio {
884 MethodResult::Skipped
885 } else {
886 let conn = conn.clone();
887 let s = fut_sym.clone();
888 match timeout(Duration::from_secs(10),
889 conn.get_long_short_ratio(&s, futures_at)).await {
890 Ok(Ok(ls)) => MethodResult::Ok(format!("long={:.4} short={:.4}", ls.long_ratio, ls.short_ratio)),
891 Ok(Err(e)) => {
892 let msg = e.to_string();
893 if msg.contains("UnsupportedOperation") || msg.contains("Not supported:") { MethodResult::Unsupported(truncate(&msg, 50)) }
894 else { MethodResult::Err(truncate(&msg, 60)) }
895 }
896 Err(_) => MethodResult::Timeout,
897 }
898 };
899
900 let liquidations = if !futures_capable || !caps.has_liquidation_history {
902 MethodResult::Skipped
903 } else {
904 let conn = conn.clone();
905 let sym_str = sym_input_str.clone();
906 match timeout(Duration::from_secs(10),
907 conn.get_liquidation_history(
908 Some(SymbolInput::Raw(&sym_str)),
909 None, None, Some(5), futures_at)).await {
910 Ok(Ok(ls)) if ls.is_empty() => MethodResult::Empty,
911 Ok(Ok(ls)) => MethodResult::Ok(format!("len={}", ls.len())),
912 Ok(Err(e)) => {
913 let msg = e.to_string();
914 if msg.contains("UnsupportedOperation") || msg.contains("Not supported:") { MethodResult::Unsupported(truncate(&msg, 50)) }
915 else { MethodResult::Err(truncate(&msg, 60)) }
916 }
917 Err(_) => MethodResult::Timeout,
918 }
919 };
920
921 let premium_index = if !futures_capable || !caps.has_premium_index {
923 MethodResult::Skipped
924 } else {
925 let conn = conn.clone();
926 let sym_str = sym_input_str.clone();
927 match timeout(Duration::from_secs(10),
928 conn.get_premium_index(Some(SymbolInput::Raw(&sym_str)), futures_at)).await {
929 Ok(Ok(ps)) if ps.is_empty() => MethodResult::Empty,
930 Ok(Ok(ps)) => MethodResult::Ok(format!("len={} mark={:.4}", ps.len(),
931 ps.first().map(|p| p.mark_price).unwrap_or(0.0))),
932 Ok(Err(e)) => {
933 let msg = e.to_string();
934 if msg.contains("UnsupportedOperation") || msg.contains("Not supported:") { MethodResult::Unsupported(truncate(&msg, 50)) }
935 else { MethodResult::Err(truncate(&msg, 60)) }
936 }
937 Err(_) => MethodResult::Timeout,
938 }
939 };
940
941 let sym_ws = sym.clone();
945 let sym_ws2 = sym.clone();
946 let sym_ws3 = sym.clone();
947 let sym_ws4 = sym.clone();
948 let sym_ws5 = sym.clone();
949 let sym_ws6 = sym.clone();
950 let sym_ws7 = sym.clone();
951 let sym_ws8 = sym.clone();
952 let sym_ws9 = sym.clone();
953
954 let ws_ticker_fut = async {
955 if !caps.has_ws_ticker { return MethodResult::Skipped; }
956 market::run_ws_sub(id, account_type, StreamType::Ticker, sym_ws, market::ExpectedKind::Ticker, stale_ms, 10).await
957 };
958 let ws_trade_fut = async {
959 if !caps.has_ws_trades { return MethodResult::Skipped; }
960 market::run_ws_sub(id, account_type, StreamType::Trade, sym_ws2, market::ExpectedKind::Trade, stale_ms, 10).await
961 };
962 let ws_ob_fut = async {
963 if !caps.has_ws_orderbook { return MethodResult::Skipped; }
964 market::run_ws_sub(id, account_type, StreamType::Orderbook, sym_ws3, market::ExpectedKind::Orderbook, stale_ms, 10).await
965 };
966 let ws_kline_fut = async {
967 if !caps.has_ws_klines { return MethodResult::Skipped; }
968 market::run_ws_sub(id, account_type, StreamType::Kline { interval: "1m".into() }, sym_ws4, market::ExpectedKind::Kline, stale_ms, 10).await
969 };
970 let ws_mark_fut = async {
971 if !futures_capable || !caps.has_ws_mark_price { return MethodResult::Skipped; }
972 market::run_ws_sub(id, futures_at, StreamType::MarkPrice, sym_ws5, market::ExpectedKind::MarkPrice, stale_ms, 10).await
973 };
974 let ws_funding_fut = async {
975 if !futures_capable || !caps.has_ws_funding_rate { return MethodResult::Skipped; }
976 market::run_ws_sub(id, futures_at, StreamType::FundingRate, sym_ws6, market::ExpectedKind::FundingRate, stale_ms, 10).await
977 };
978 let ws_liq_fut = async {
979 if !futures_capable { return MethodResult::Skipped; }
980 let liq_sym = liq_symbol_for(id);
987 drop(sym_ws7); if id == ExchangeId::Bybit {
989 let bybit_liq_syms: &[&str] = &["BTCUSDT", "ETHUSDT", "SOLUSDT", "XRPUSDT", "DOGEUSDT"];
999 let mut handles = Vec::new();
1000 for &sym_str in bybit_liq_syms {
1001 let sym = Symbol::with_raw("", "", sym_str.to_string());
1002 let h = tokio::spawn(market::run_ws_sub(
1003 ExchangeId::Bybit,
1004 futures_at,
1005 StreamType::Liquidation,
1006 sym,
1007 market::ExpectedKind::Liquidation,
1008 stale_ms,
1009 60,
1010 ));
1011 handles.push(h);
1012 }
1013 let mut last = MethodResult::Err("silent_0_events".into());
1015 for h in handles {
1016 if let Ok(r) = h.await {
1017 match &r {
1018 MethodResult::Ok(_) => return r,
1019 other => last = other.clone(),
1020 }
1021 }
1022 }
1023 last
1024 } else {
1025 market::run_ws_sub(id, futures_at, StreamType::Liquidation, liq_sym, market::ExpectedKind::Liquidation, stale_ms, 30).await
1026 }
1027 };
1028 let ws_oi_fut = async {
1029 if !futures_capable { return MethodResult::Skipped; }
1030 market::run_ws_sub(id, futures_at, StreamType::OpenInterest, sym_ws8, market::ExpectedKind::OpenInterest, stale_ms, 20).await
1032 };
1033 let ws_agg_fut = async {
1034 if !futures_capable { return MethodResult::Skipped; }
1035 market::run_ws_sub(id, futures_at, StreamType::AggTrade, sym_ws9, market::ExpectedKind::AggTrade, stale_ms, 10).await
1036 };
1037
1038 let (ws_ticker, ws_trade, ws_orderbook, ws_kline, ws_mark_price, ws_funding, ws_liquidation, ws_oi, ws_agg_trade) =
1039 tokio::join!(
1040 ws_ticker_fut, ws_trade_fut, ws_ob_fut, ws_kline_fut,
1041 ws_mark_fut, ws_funding_fut, ws_liq_fut, ws_oi_fut, ws_agg_fut
1042 );
1043
1044 let mut issues: Vec<String> = Vec::new();
1046 let method_cells = [
1047 ("ping", &ping), ("price", &price), ("ticker", &ticker),
1048 ("orderbook", &orderbook), ("klines", &klines), ("trades", &trades),
1049 ("exch_info", &exch_info), ("funding", &funding), ("OI", &open_interest),
1050 ("mark_px", &mark_price), ("ls_ratio", &long_short), ("liquidations", &liquidations),
1051 ("premium_idx", &premium_index),
1052 ("WS_ticker", &ws_ticker), ("WS_trade", &ws_trade), ("WS_ob", &ws_orderbook),
1053 ("WS_kline", &ws_kline), ("WS_mark", &ws_mark_price), ("WS_funding", &ws_funding),
1054 ("WS_liq", &ws_liquidation), ("WS_oi", &ws_oi), ("WS_agg", &ws_agg_trade),
1055 ];
1056 for (name, result) in &method_cells {
1057 if result.is_issue() {
1058 if let Some(d) = result.detail() {
1059 issues.push(format!("{}: {}", name, d));
1060 } else {
1061 issues.push(format!("{}: {:?}", name, result));
1062 }
1063 }
1064 }
1065
1066 MarketRow {
1067 exchange: format!("{:?}", id),
1068 ping, price, ticker, orderbook, klines, trades, exch_info,
1069 funding, open_interest, mark_price, long_short, liquidations, premium_index,
1070 ws_ticker, ws_trade, ws_orderbook: ws_orderbook, ws_kline, ws_mark_price, ws_funding,
1071 ws_liquidation, ws_oi, ws_agg_trade,
1072 issues,
1073 }
1074}
1075
1076mod trading {
1081 use super::*;
1082
1083 pub fn load_credentials(id: ExchangeId) -> Option<Credentials> {
1085 let (key_env, secret_env, pass_env): (&str, &str, Option<&str>) = match id {
1086 ExchangeId::Binance => ("BINANCE_API_KEY", "BINANCE_API_SECRET", None),
1087 ExchangeId::Bybit => ("BYBIT_API_KEY", "BYBIT_API_SECRET", None),
1088 ExchangeId::OKX => ("OKX_API_KEY", "OKX_API_SECRET", Some("OKX_PASSPHRASE")),
1089 ExchangeId::KuCoin => ("KUCOIN_API_KEY", "KUCOIN_API_SECRET", Some("KUCOIN_PASSPHRASE")),
1090 ExchangeId::GateIO => ("GATEIO_API_KEY", "GATEIO_API_SECRET", None),
1091 ExchangeId::MEXC => ("MEXC_API_KEY", "MEXC_API_SECRET", None),
1092 ExchangeId::HTX => ("HTX_API_KEY", "HTX_API_SECRET", None),
1093 ExchangeId::Bitget => ("BITGET_API_KEY", "BITGET_API_SECRET", Some("BITGET_PASSPHRASE")),
1094 ExchangeId::BingX => ("BINGX_API_KEY", "BINGX_API_SECRET", None),
1095 ExchangeId::CryptoCom => ("CRYPTOCOM_API_KEY", "CRYPTOCOM_API_SECRET", None),
1096 ExchangeId::Bitfinex => ("BITFINEX_API_KEY", "BITFINEX_API_SECRET", None),
1097 ExchangeId::Gemini => ("GEMINI_API_KEY", "GEMINI_API_SECRET", None),
1098 ExchangeId::Bitstamp => ("BITSTAMP_API_KEY", "BITSTAMP_API_SECRET", None),
1099 ExchangeId::Kraken => ("KRAKEN_API_KEY", "KRAKEN_API_SECRET", None),
1100 ExchangeId::Coinbase => ("COINBASE_API_KEY", "COINBASE_API_SECRET", None),
1101 ExchangeId::Deribit => ("DERIBIT_API_KEY", "DERIBIT_API_SECRET", None),
1102 ExchangeId::HyperLiquid => ("HYPERLIQUID_API_KEY", "HYPERLIQUID_API_SECRET", None),
1103 ExchangeId::Dydx => ("DYDX_API_KEY", "DYDX_API_SECRET", None),
1104 ExchangeId::Upbit => ("UPBIT_API_KEY", "UPBIT_API_SECRET", None),
1105 _ => return None,
1106 };
1107
1108 let api_key = std::env::var(key_env).ok()?;
1109 let api_secret = std::env::var(secret_env).ok()?;
1110 let passphrase = pass_env.and_then(|e| std::env::var(e).ok());
1111
1112 if api_key.is_empty() || api_secret.is_empty() { return None; }
1113
1114 Some(Credentials { api_key, api_secret, passphrase, testnet: false })
1115 }
1116}
1117
1118async fn test_trading(id: ExchangeId) -> TradingRow {
1119 let harness = TestHarness::new();
1120 let conn = match harness.create_authenticated(id).await {
1121 None => {
1122 match trading::load_credentials(id) {
1124 None => {
1125 return TradingRow {
1126 exchange: format!("{:?}", id),
1127 balance: MethodResult::Skipped,
1128 account_info: MethodResult::Skipped,
1129 open_orders: MethodResult::Skipped,
1130 user_trades: MethodResult::Skipped,
1131 positions: MethodResult::Skipped,
1132 fees: MethodResult::Skipped,
1133 issues: vec!["no_credentials_in_env".into()],
1134 };
1135 }
1136 Some(_) => {
1137 return TradingRow {
1139 exchange: format!("{:?}", id),
1140 balance: MethodResult::Skipped,
1141 account_info: MethodResult::Skipped,
1142 open_orders: MethodResult::Skipped,
1143 user_trades: MethodResult::Skipped,
1144 positions: MethodResult::Skipped,
1145 fees: MethodResult::Skipped,
1146 issues: vec!["creds_in_env_but_not_dotenv".into()],
1147 };
1148 }
1149 }
1150 }
1151 Some(Err(e)) => {
1152 let msg = truncate(&e.to_string(), 70);
1153 return TradingRow {
1154 exchange: format!("{:?}", id),
1155 balance: MethodResult::Err(format!("auth_connect_fail: {}", msg)),
1156 account_info: MethodResult::Skipped,
1157 open_orders: MethodResult::Skipped,
1158 user_trades: MethodResult::Skipped,
1159 positions: MethodResult::Skipped,
1160 fees: MethodResult::Skipped,
1161 issues: vec![format!("auth_fail: {}", msg)],
1162 };
1163 }
1164 Some(Ok(c)) => c,
1165 };
1166
1167 let (_, raw_str, account_type) = raw_symbol_for(id);
1168 let futures_at = if matches!(account_type, AccountType::FuturesCross | AccountType::FuturesIsolated) {
1169 account_type
1170 } else {
1171 AccountType::FuturesCross
1172 };
1173
1174 let balance = {
1176 let conn = conn.clone();
1177 match timeout(Duration::from_secs(10),
1178 conn.get_balance(BalanceQuery { asset: None, account_type })).await {
1179 Ok(Ok(bs)) => MethodResult::Ok(format!("assets={}", bs.len())),
1180 Ok(Err(e)) => {
1181 let msg = e.to_string();
1182 if msg.contains("UnsupportedOperation") || msg.contains("Not supported:") { MethodResult::Unsupported(truncate(&msg, 50)) }
1183 else { MethodResult::Err(truncate(&msg, 60)) }
1184 }
1185 Err(_) => MethodResult::Timeout,
1186 }
1187 };
1188
1189 let account_info = {
1191 let conn = conn.clone();
1192 match timeout(Duration::from_secs(10),
1193 conn.get_account_info(account_type)).await {
1194 Ok(Ok(_)) => MethodResult::Ok("ok".into()),
1195 Ok(Err(e)) => {
1196 let msg = e.to_string();
1197 if msg.contains("UnsupportedOperation") || msg.contains("Not supported:") { MethodResult::Unsupported(truncate(&msg, 50)) }
1198 else { MethodResult::Err(truncate(&msg, 60)) }
1199 }
1200 Err(_) => MethodResult::Timeout,
1201 }
1202 };
1203
1204 let open_orders = {
1206 let conn = conn.clone();
1207 let s = raw_str.clone();
1208 match timeout(Duration::from_secs(10),
1209 conn.get_open_orders(Some(&s), account_type)).await {
1210 Ok(Ok(os)) => MethodResult::Ok(format!("count={}", os.len())),
1211 Ok(Err(e)) => {
1212 let msg = e.to_string();
1213 if msg.contains("UnsupportedOperation") || msg.contains("Not supported:") { MethodResult::Unsupported(truncate(&msg, 50)) }
1214 else { MethodResult::Err(truncate(&msg, 60)) }
1215 }
1216 Err(_) => MethodResult::Timeout,
1217 }
1218 };
1219
1220 let user_trades = {
1222 let conn = conn.clone();
1223 let s = raw_str.clone();
1224 match timeout(Duration::from_secs(10),
1225 conn.get_user_trades(
1226 UserTradeFilter { symbol: Some(s), order_id: None, start_time: None, end_time: None, limit: Some(5) },
1227 account_type)).await {
1228 Ok(Ok(ts)) => MethodResult::Ok(format!("count={}", ts.len())),
1229 Ok(Err(e)) => {
1230 let msg = e.to_string();
1231 if msg.contains("UnsupportedOperation") || msg.contains("Not supported:") { MethodResult::Unsupported(truncate(&msg, 50)) }
1232 else { MethodResult::Err(truncate(&msg, 60)) }
1233 }
1234 Err(_) => MethodResult::Timeout,
1235 }
1236 };
1237
1238 let positions = if !is_futures(id, account_type) {
1240 MethodResult::Skipped
1241 } else {
1242 let conn = conn.clone();
1243 match timeout(Duration::from_secs(10),
1244 conn.get_positions(PositionQuery { symbol: None, account_type: futures_at })).await {
1245 Ok(Ok(ps)) => MethodResult::Ok(format!("count={}", ps.len())),
1246 Ok(Err(e)) => {
1247 let msg = e.to_string();
1248 if msg.contains("UnsupportedOperation") || msg.contains("Not supported:") { MethodResult::Unsupported(truncate(&msg, 50)) }
1249 else { MethodResult::Err(truncate(&msg, 60)) }
1250 }
1251 Err(_) => MethodResult::Timeout,
1252 }
1253 };
1254
1255 let fees = {
1257 let conn = conn.clone();
1258 let s = raw_str.clone();
1259 match timeout(Duration::from_secs(10), conn.get_fees(Some(&s))).await {
1260 Ok(Ok(f)) => MethodResult::Ok(format!("maker={:.6} taker={:.6}", f.maker_rate, f.taker_rate)),
1261 Ok(Err(e)) => {
1262 let msg = e.to_string();
1263 if msg.contains("UnsupportedOperation") || msg.contains("Not supported:") { MethodResult::Unsupported(truncate(&msg, 50)) }
1264 else { MethodResult::Err(truncate(&msg, 60)) }
1265 }
1266 Err(_) => MethodResult::Timeout,
1267 }
1268 };
1269
1270 let mut issues: Vec<String> = Vec::new();
1271 for (name, result) in [
1272 ("balance", &balance), ("account_info", &account_info),
1273 ("open_orders", &open_orders), ("user_trades", &user_trades),
1274 ("positions", &positions), ("fees", &fees),
1275 ] {
1276 if result.is_issue() {
1277 if let Some(d) = result.detail() {
1278 issues.push(format!("{}: {}", name, d));
1279 } else {
1280 issues.push(format!("{}: {:?}", name, result));
1281 }
1282 }
1283 }
1284
1285 TradingRow {
1286 exchange: format!("{:?}", id),
1287 balance, account_info, open_orders, user_trades, positions, fees,
1288 issues,
1289 }
1290}
1291
1292async fn test_moex_market() -> MarketRow {
1297 let hub = ExchangeHub::new();
1298 let symbol_moex = Symbol::new("GAZP", "");
1299 let symbol_moex_str = SymbolNormalizer::to_exchange(ExchangeId::Moex, &symbol_moex, AccountType::Spot)
1300 .unwrap_or_else(|_| "GAZP".to_string());
1301 let stale_ms = stale_threshold_ms();
1302 let account_type = AccountType::Spot;
1303
1304 let connected = match timeout(Duration::from_secs(10), hub.connect_public(ExchangeId::Moex, false)).await {
1305 Ok(Ok(())) => true,
1306 _ => false,
1307 };
1308
1309 let (ping, price, ticker, orderbook, klines, trades, exch_info) = if !connected {
1310 (MethodResult::Err("connect_fail".into()), MethodResult::Skipped, MethodResult::Skipped,
1311 MethodResult::Skipped, MethodResult::Skipped, MethodResult::Skipped, MethodResult::Skipped)
1312 } else {
1313 let conn = match hub.rest(ExchangeId::Moex) {
1314 Some(c) => c,
1315 None => {
1316 return MarketRow {
1317 exchange: "Moex".into(),
1318 ping: MethodResult::Err("no_rest_handle".into()),
1319 price: MethodResult::Skipped, ticker: MethodResult::Skipped,
1320 orderbook: MethodResult::Skipped, klines: MethodResult::Skipped,
1321 trades: MethodResult::Skipped, exch_info: MethodResult::Skipped,
1322 funding: MethodResult::Skipped, open_interest: MethodResult::Skipped,
1323 mark_price: MethodResult::Skipped, long_short: MethodResult::Skipped,
1324 liquidations: MethodResult::Skipped, premium_index: MethodResult::Skipped,
1325 ws_ticker: MethodResult::Skipped, ws_trade: MethodResult::Skipped,
1326 ws_orderbook: MethodResult::Skipped, ws_kline: MethodResult::Skipped,
1327 ws_mark_price: MethodResult::Skipped, ws_funding: MethodResult::Skipped,
1328 ws_liquidation: MethodResult::Skipped, ws_oi: MethodResult::Skipped,
1329 ws_agg_trade: MethodResult::Skipped,
1330 issues: vec!["no_rest_handle".into()],
1331 };
1332 }
1333 };
1334 let sym_str = symbol_moex_str.clone();
1335 let ping = match timeout(Duration::from_secs(8), conn.ping()).await {
1336 Ok(Ok(())) => MethodResult::Ok("pong".into()),
1337 Ok(Err(e)) => MethodResult::Err(truncate(&e.to_string(), 60)),
1338 Err(_) => MethodResult::Timeout,
1339 };
1340 let ticker = match timeout(Duration::from_secs(10),
1341 MarketData::get_ticker(&*conn, sym_str.as_str().into(), account_type)).await {
1342 Ok(Ok(t)) => {
1343 let mut issues = Vec::new();
1344 if t.last_price <= 0.0 { issues.push("last=0"); }
1345 if timestamp_unit_bug(t.timestamp) { issues.push("ts_unit_bug"); }
1346 if issues.is_empty() { MethodResult::Ok(format!("last={:.4}", t.last_price)) }
1347 else { MethodResult::Err(format!("last={:.4} ISSUES:{}", t.last_price, issues.join(","))) }
1348 }
1349 Ok(Err(e)) => MethodResult::Err(truncate(&e.to_string(), 60)),
1350 Err(_) => MethodResult::Timeout,
1351 };
1352 (ping, MethodResult::Skipped, ticker, MethodResult::Skipped, MethodResult::Skipped, MethodResult::Skipped, MethodResult::Skipped)
1353 };
1354
1355 let ws_ticker = {
1357 let ws = Arc::new(MoexWebSocket::new_public()) as Arc<dyn WebSocketConnector>;
1358 let moex_sym = Symbol::new("GAZP", "");
1359 let sub = SubscriptionRequest::ticker_for(moex_sym, AccountType::Spot);
1360 match timeout(Duration::from_secs(20),
1361 market::collect_ws_stream(ws, sub, market::ExpectedKind::Ticker, stale_ms, 5)).await {
1362 Ok(r) => r,
1363 Err(_) => MethodResult::Err("overall_timeout_20s".into()),
1364 }
1365 };
1366
1367 let mut issues: Vec<String> = Vec::new();
1368 for (name, result) in [("ping", &ping), ("ticker", &ticker), ("WS_ticker", &ws_ticker)] {
1369 if result.is_issue() {
1370 if let Some(d) = result.detail() { issues.push(format!("{}: {}", name, d)); }
1371 }
1372 }
1373
1374 MarketRow {
1375 exchange: "Moex".into(),
1376 ping, price, ticker, orderbook, klines, trades, exch_info,
1377 funding: MethodResult::Skipped, open_interest: MethodResult::Skipped,
1378 mark_price: MethodResult::Skipped, long_short: MethodResult::Skipped,
1379 liquidations: MethodResult::Skipped, premium_index: MethodResult::Skipped,
1380 ws_ticker, ws_trade: MethodResult::Skipped, ws_orderbook: MethodResult::Skipped,
1381 ws_kline: MethodResult::Skipped, ws_mark_price: MethodResult::Skipped,
1382 ws_funding: MethodResult::Skipped, ws_liquidation: MethodResult::Skipped,
1383 ws_oi: MethodResult::Skipped, ws_agg_trade: MethodResult::Skipped,
1384 issues,
1385 }
1386}
1387
1388mod report {
1393 use super::*;
1394
1395 pub fn print_market_matrix(rows: &[MarketRow]) {
1396 println!();
1397 println!("=== MARKET COVERAGE MATRIX ===");
1398 println!("{:<18} | REST | WS", "");
1399 println!("{:<18} | ping pric tick ob klin trad exch fund OI mark ls liq px | tick trad ob klin mark fund liq OI agg", "Exchange");
1400 println!("{}", "-".repeat(170));
1401 for row in rows {
1402 let rest_cells = [
1403 row.ping.cell(), row.price.cell(), row.ticker.cell(), row.orderbook.cell(),
1404 row.klines.cell(), row.trades.cell(), row.exch_info.cell(),
1405 row.funding.cell(), row.open_interest.cell(), row.mark_price.cell(),
1406 row.long_short.cell(), row.liquidations.cell(), row.premium_index.cell(),
1407 ];
1408 let ws_cells = [
1409 row.ws_ticker.cell(), row.ws_trade.cell(), row.ws_orderbook.cell(),
1410 row.ws_kline.cell(), row.ws_mark_price.cell(), row.ws_funding.cell(),
1411 row.ws_liquidation.cell(), row.ws_oi.cell(), row.ws_agg_trade.cell(),
1412 ];
1413 let rest_str = rest_cells.join(" ");
1414 let ws_str = ws_cells.join(" ");
1415 println!("{:<18} | {} | {}", row.exchange, rest_str, ws_str);
1416 }
1417 }
1418
1419 pub fn print_trading_matrix(rows: &[TradingRow]) {
1420 println!();
1421 println!("=== TRADING COVERAGE MATRIX ===");
1422 println!("{:<18} | balance acc_info open_ord usr_trd positions fees", "Exchange");
1423 println!("{}", "-".repeat(80));
1424 for row in rows {
1425 let cells = [
1426 row.balance.cell(), row.account_info.cell(), row.open_orders.cell(),
1427 row.user_trades.cell(), row.positions.cell(), row.fees.cell(),
1428 ];
1429 println!("{:<18} | {}", row.exchange, cells.join(" "));
1430 }
1431 }
1432
1433 pub fn print_summaries(market_rows: &[MarketRow], trading_rows: &[TradingRow]) {
1434 let trusted: Vec<&str> = market_rows.iter()
1436 .filter(|r| {
1437 r.ping.is_ok() && r.ticker.is_ok() && r.orderbook.is_ok() && r.klines.is_ok()
1438 && r.ws_ticker.is_ok()
1439 && r.issues.is_empty()
1440 })
1441 .map(|r| r.exchange.as_str())
1442 .collect();
1443 println!();
1444 println!("=== TRUSTED (ping+ticker+ob+klines OK, WS_ticker OK, no issues) ===");
1445 println!("Count: {}", trusted.len());
1446 for ex in &trusted { println!(" + {}", ex); }
1447
1448 let partial: Vec<&MarketRow> = market_rows.iter()
1450 .filter(|r| !r.issues.is_empty() && (r.ping.is_ok() || r.ticker.is_ok()))
1451 .collect();
1452 println!();
1453 println!("=== PARTIAL (some methods fail) ===");
1454 for row in &partial {
1455 println!(" {} | {}", row.exchange, row.issues.first().map(|s| s.as_str()).unwrap_or(""));
1456 }
1457
1458 let has_issues: Vec<&MarketRow> = market_rows.iter().filter(|r| !r.issues.is_empty()).collect();
1460 println!();
1461 println!("=== ISSUES BY EXCHANGE ===");
1462 for row in &has_issues {
1463 for iss in &row.issues {
1464 println!(" {:18} | {}", row.exchange, iss);
1465 }
1466 }
1467
1468 if !trading_rows.is_empty() {
1470 let trading_issues: Vec<&TradingRow> = trading_rows.iter().filter(|r| !r.issues.is_empty()).collect();
1471 if !trading_issues.is_empty() {
1472 println!();
1473 println!("=== TRADING ISSUES ===");
1474 for row in &trading_issues {
1475 for iss in &row.issues {
1476 println!(" {:18} | {}", row.exchange, iss);
1477 }
1478 }
1479 }
1480 }
1481 }
1482
1483 pub fn write_json(path: &str, reports: &[(ExchangeId, ExchangeReport)]) {
1484 let path_buf: std::path::PathBuf = if path == "auto" || path == "default" {
1486 std::path::PathBuf::from(format!(
1487 "target/harness_out/e2e_smoke_{}.json",
1488 now_ms()
1489 ))
1490 } else {
1491 std::path::PathBuf::from(path)
1492 };
1493 if let Some(parent) = path_buf.parent() {
1494 let _ = std::fs::create_dir_all(parent);
1495 }
1496 let json = serde_json::json!({
1497 "timestamp": now_ms(),
1498 "exchanges": reports.iter().map(|(id, r)| {
1499 serde_json::json!({
1500 "exchange": format!("{:?}", id),
1501 "report": r,
1502 })
1503 }).collect::<Vec<_>>(),
1504 });
1505 match std::fs::write(&path_buf, serde_json::to_string_pretty(&json).unwrap_or_default()) {
1506 Ok(()) => println!("JSON report written to {}", path_buf.display()),
1507 Err(e) => println!("Failed to write JSON: {}", e),
1508 }
1509 }
1510}
1511
1512fn all_testable_exchanges() -> Vec<ExchangeId> {
1517 vec![
1518 ExchangeId::Binance, ExchangeId::Bybit, ExchangeId::OKX, ExchangeId::KuCoin,
1519 ExchangeId::Kraken, ExchangeId::GateIO, ExchangeId::Bitfinex, ExchangeId::MEXC,
1520 ExchangeId::HTX, ExchangeId::BingX, ExchangeId::CryptoCom, ExchangeId::Upbit,
1521 ExchangeId::Deribit, ExchangeId::HyperLiquid, ExchangeId::Bitget,
1522 ExchangeId::Bitstamp, ExchangeId::Coinbase, ExchangeId::Gemini,
1523 ExchangeId::Dydx, ExchangeId::Lighter,
1524 ExchangeId::YahooFinance, ExchangeId::CryptoCompare, ExchangeId::Twelvedata,
1525 ExchangeId::Polymarket, ExchangeId::Dukascopy, ExchangeId::Alpaca,
1526 ExchangeId::Krx,
1527 ExchangeId::Polygon, ExchangeId::Finnhub, ExchangeId::Tiingo,
1528 ExchangeId::AlphaVantage, ExchangeId::AngelOne, ExchangeId::Zerodha,
1529 ExchangeId::Upstox, ExchangeId::Dhan, ExchangeId::Fyers,
1530 ExchangeId::Oanda, ExchangeId::JQuants, ExchangeId::Tinkoff,
1531 ExchangeId::Ib, ExchangeId::Futu, ExchangeId::Coinglass,
1532 ]
1533}
1534
1535#[tokio::main]
1540async fn main() -> Result<(), Box<dyn std::error::Error>> {
1541 let args = cli::Args::parse();
1542 let start = Instant::now();
1543
1544 let mut exchanges = all_testable_exchanges();
1545
1546 if let Some(ref filter) = args.exchange_filter {
1548 let filter_lc = filter.to_lowercase();
1549 exchanges.retain(|id| format!("{:?}", id).to_lowercase().contains(&filter_lc));
1550 if exchanges.is_empty() {
1551 eprintln!("No exchange matched filter '{}'", filter);
1552 return Ok(());
1553 }
1554 }
1555
1556 println!("=== e2e_smoke — digdigdig3 ===");
1557 println!("Exchanges: {} | market={} trading={}", exchanges.len(), args.run_market, args.run_trading);
1558 if let Some(ref f) = args.exchange_filter { println!("Filter: {}", f); }
1559 println!();
1560
1561 let mut all_reports: Vec<(ExchangeId, ExchangeReport)> = Vec::new();
1562 let mut market_rows: Vec<MarketRow> = Vec::new();
1563 let mut trading_rows: Vec<TradingRow> = Vec::new();
1564
1565 if args.run_market {
1567 let include_moex = args.exchange_filter.as_deref()
1569 .map(|f| "moex".contains(&f.to_lowercase()))
1570 .unwrap_or(true);
1571
1572 let mut market_handles: Vec<tokio::task::JoinHandle<MarketRow>> = exchanges
1573 .iter()
1574 .copied()
1575 .map(|id| {
1576 tokio::spawn(async move {
1577 timeout(Duration::from_secs(90), test_market(id))
1582 .await
1583 .unwrap_or_else(|_| MarketRow {
1584 exchange: format!("{:?}", id),
1585 ping: MethodResult::Err("HARD_TIMEOUT_90s".into()),
1586 price: MethodResult::Skipped, ticker: MethodResult::Skipped,
1587 orderbook: MethodResult::Skipped, klines: MethodResult::Skipped,
1588 trades: MethodResult::Skipped, exch_info: MethodResult::Skipped,
1589 funding: MethodResult::Skipped, open_interest: MethodResult::Skipped,
1590 mark_price: MethodResult::Skipped, long_short: MethodResult::Skipped,
1591 liquidations: MethodResult::Skipped, premium_index: MethodResult::Skipped,
1592 ws_ticker: MethodResult::Skipped, ws_trade: MethodResult::Skipped,
1593 ws_orderbook: MethodResult::Skipped, ws_kline: MethodResult::Skipped,
1594 ws_mark_price: MethodResult::Skipped, ws_funding: MethodResult::Skipped,
1595 ws_liquidation: MethodResult::Skipped, ws_oi: MethodResult::Skipped,
1596 ws_agg_trade: MethodResult::Skipped,
1597 issues: vec!["HARD_TIMEOUT_90s".into()],
1598 })
1599 })
1600 })
1601 .collect();
1602
1603 if include_moex {
1604 market_handles.push(tokio::spawn(async move {
1605 timeout(Duration::from_secs(35), test_moex_market())
1606 .await
1607 .unwrap_or_else(|_| MarketRow {
1608 exchange: "Moex".into(),
1609 ping: MethodResult::Err("HARD_TIMEOUT_35s".into()),
1610 price: MethodResult::Skipped, ticker: MethodResult::Skipped,
1611 orderbook: MethodResult::Skipped, klines: MethodResult::Skipped,
1612 trades: MethodResult::Skipped, exch_info: MethodResult::Skipped,
1613 funding: MethodResult::Skipped, open_interest: MethodResult::Skipped,
1614 mark_price: MethodResult::Skipped, long_short: MethodResult::Skipped,
1615 liquidations: MethodResult::Skipped, premium_index: MethodResult::Skipped,
1616 ws_ticker: MethodResult::Skipped, ws_trade: MethodResult::Skipped,
1617 ws_orderbook: MethodResult::Skipped, ws_kline: MethodResult::Skipped,
1618 ws_mark_price: MethodResult::Skipped, ws_funding: MethodResult::Skipped,
1619 ws_liquidation: MethodResult::Skipped, ws_oi: MethodResult::Skipped,
1620 ws_agg_trade: MethodResult::Skipped,
1621 issues: vec!["HARD_TIMEOUT_35s".into()],
1622 })
1623 }));
1624 }
1625
1626 let results = futures_util::future::join_all(market_handles).await;
1627 market_rows = results.into_iter().filter_map(|r| r.ok()).collect();
1628 market_rows.sort_by_key(|r| r.exchange.clone());
1629
1630 println!("=== PER-EXCHANGE DETAILS (issues only) ===");
1632 for row in &market_rows {
1633 if !row.issues.is_empty() {
1634 println!("{:18} | ISSUES: {}", row.exchange, row.issues.join(" | "));
1635 }
1636 }
1637
1638 report::print_market_matrix(&market_rows);
1639
1640 for row in &market_rows {
1642 let id = exchanges.iter().find(|&&id| format!("{:?}", id) == row.exchange)
1643 .copied()
1644 .unwrap_or(ExchangeId::Moex);
1645 all_reports.push((id, ExchangeReport { market: Some(row.clone()), trading: None }));
1646 }
1647 }
1648
1649 if args.run_trading {
1651 let trading_handles: Vec<tokio::task::JoinHandle<TradingRow>> = exchanges
1652 .iter()
1653 .copied()
1654 .map(|id| {
1655 tokio::spawn(async move {
1656 timeout(Duration::from_secs(30), test_trading(id))
1657 .await
1658 .unwrap_or_else(|_| TradingRow {
1659 exchange: format!("{:?}", id),
1660 balance: MethodResult::Err("HARD_TIMEOUT_30s".into()),
1661 account_info: MethodResult::Skipped, open_orders: MethodResult::Skipped,
1662 user_trades: MethodResult::Skipped, positions: MethodResult::Skipped,
1663 fees: MethodResult::Skipped,
1664 issues: vec!["HARD_TIMEOUT_30s".into()],
1665 })
1666 })
1667 })
1668 .collect();
1669
1670 let results = futures_util::future::join_all(trading_handles).await;
1671 trading_rows = results.into_iter().filter_map(|r| r.ok()).collect();
1672 trading_rows.sort_by_key(|r| r.exchange.clone());
1673
1674 report::print_trading_matrix(&trading_rows);
1675
1676 for tr in &trading_rows {
1678 let id = exchanges.iter().find(|&&id| format!("{:?}", id) == tr.exchange).copied();
1679 if let Some(id) = id {
1680 if let Some(entry) = all_reports.iter_mut().find(|(eid, _)| *eid == id) {
1681 entry.1.trading = Some(tr.clone());
1682 } else {
1683 all_reports.push((id, ExchangeReport { market: None, trading: Some(tr.clone()) }));
1684 }
1685 }
1686 }
1687 }
1688
1689 report::print_summaries(&market_rows, &trading_rows);
1691
1692 if let Some(ref path) = args.json_out {
1694 report::write_json(path, &all_reports);
1695 }
1696
1697 println!();
1698 println!("Total runtime: {:.1}s", start.elapsed().as_secs_f64());
1699
1700 Ok(())
1701}