use std::time::Duration;
use futures_util::StreamExt;
use tokio::time::timeout;
use digdigdig3::connector_manager::ExchangeHub;
use digdigdig3::core::types::{
AccountType, ExchangeId, StreamEvent, StreamType, Symbol, SubscriptionRequest,
};
#[tokio::test]
#[ignore] async fn bitfinex_ticker_two_symbols_receive_events() {
let hub = ExchangeHub::new();
hub.connect_full(ExchangeId::Bitfinex, &[AccountType::Spot], false)
.await
.expect("connect_full Bitfinex");
let ws = hub
.ws(ExchangeId::Bitfinex, AccountType::Spot)
.expect("no WS connector after connect_full");
ws.connect(AccountType::Spot)
.await
.expect("ws.connect");
let btcusd = Symbol::with_raw("", "", "tBTCUSD".to_string());
let ethusd = Symbol::with_raw("", "", "tETHUSD".to_string());
ws.subscribe(SubscriptionRequest {
symbol: btcusd.clone(),
stream_type: StreamType::Ticker,
account_type: AccountType::Spot,
depth: None,
update_speed_ms: None,
})
.await
.expect("subscribe Ticker tBTCUSD");
ws.subscribe(SubscriptionRequest {
symbol: ethusd.clone(),
stream_type: StreamType::Ticker,
account_type: AccountType::Spot,
depth: None,
update_speed_ms: None,
})
.await
.expect("subscribe Ticker tETHUSD");
let mut stream = ws.event_stream();
let mut saw_btc_ticker = false;
let mut saw_eth_ticker = false;
let result = timeout(Duration::from_secs(30), async {
while let Some(event) = stream.next().await {
match event {
Ok(StreamEvent::Ticker { symbol, ticker }) => {
eprintln!(
" Ticker: {} last={:.2} bid={} ask={}",
symbol,
ticker.last_price,
ticker.bid_price.map(|p| format!("{:.2}", p)).unwrap_or_default(),
ticker.ask_price.map(|p| format!("{:.2}", p)).unwrap_or_default(),
);
if symbol == "tBTCUSD" {
assert!(
ticker.last_price > 0.0,
"BTC ticker last_price must be positive, got {}",
ticker.last_price
);
saw_btc_ticker = true;
} else if symbol == "tETHUSD" {
assert!(
ticker.last_price > 0.0,
"ETH ticker last_price must be positive, got {}",
ticker.last_price
);
saw_eth_ticker = true;
}
}
Err(e) => {
eprintln!("stream error: {:?}", e);
}
_ => {}
}
if saw_btc_ticker && saw_eth_ticker {
break;
}
}
})
.await;
assert!(result.is_ok(), "timed out waiting for Bitfinex Ticker events (30s)");
assert!(saw_btc_ticker, "no Ticker received for tBTCUSD");
assert!(saw_eth_ticker, "no Ticker received for tETHUSD");
}
#[tokio::test]
#[ignore] async fn bitfinex_liq_global_subscribe_and_receive() {
let hub = ExchangeHub::new();
hub.connect_full(ExchangeId::Bitfinex, &[AccountType::Spot], false)
.await
.expect("connect_full Bitfinex");
let ws = hub
.ws(ExchangeId::Bitfinex, AccountType::Spot)
.expect("no WS connector after connect_full");
ws.connect(AccountType::Spot)
.await
.expect("ws.connect");
let dummy = Symbol::with_raw("", "", "liq:global".to_string());
ws.subscribe(SubscriptionRequest {
symbol: dummy,
stream_type: StreamType::Liquidation,
account_type: AccountType::Spot,
depth: None,
update_speed_ms: None,
})
.await
.expect("subscribe Liquidation liq:global");
eprintln!("Subscribed to liq:global — listening for 60s (sparse feed)");
let mut stream = ws.event_stream();
let mut liq_count = 0usize;
let mut error_count = 0usize;
let mut subscribe_ok = false;
let result = timeout(Duration::from_secs(60), async {
while let Some(event) = stream.next().await {
match event {
Ok(StreamEvent::Liquidation { symbol, side, price, quantity, timestamp, value }) => {
liq_count += 1;
subscribe_ok = true;
eprintln!(
" Liquidation #{}: symbol='{}' side={:?} price={:.4} qty={:.6} ts={} value={:?}",
liq_count, symbol, side, price, quantity, timestamp, value
);
assert!(!symbol.is_empty(), "symbol must not be empty");
assert!(price > 0.0, "liquidation price must be positive");
assert!(quantity > 0.0, "liquidation quantity must be positive");
assert!(timestamp > 0, "liquidation timestamp must be positive");
if liq_count >= 1 {
break;
}
}
Ok(_other) => {
subscribe_ok = true; }
Err(e) => {
eprintln!("stream error: {:?}", e);
error_count += 1;
}
}
}
})
.await;
if result.is_err() {
eprintln!(
"60s window elapsed: {} liqs caught, {} errors. Sparse feed — acceptable.",
liq_count, error_count
);
}
assert_eq!(
error_count, 0,
"no stream errors expected for liq:global subscribe"
);
eprintln!(
"liq:global test complete: {} liquidations caught, subscribe_ok={}",
liq_count, subscribe_ok
);
}