use std::time::Duration;
use tokio::time::timeout;
use tokio_stream::StreamExt;
use crate::core::{AccountType, ConnectionStatus, StreamType, SubscriptionRequest, Symbol};
use crate::core::traits::WebSocketConnector;
use super::websocket::DydxWebSocket;
fn btc_usd() -> Symbol {
Symbol::new("BTC", "USD")
}
#[tokio::test]
#[ignore]
async fn test_orderbook_capabilities() {
let ws = match DydxWebSocket::new(false, AccountType::FuturesCross).await {
Ok(w) => w,
Err(e) => {
println!("Failed to create WebSocket: {:?}", e);
return;
}
};
let caps = ws.orderbook_capabilities(AccountType::FuturesCross);
println!("dYdX orderbook capabilities: {:?}", caps);
assert!(caps.supports_snapshot, "dYdX must support snapshots");
assert!(caps.supports_delta, "dYdX must support incremental deltas");
assert!(caps.has_sequence, "dYdX must carry message_id sequence");
println!("supports_snapshot: {}", caps.supports_snapshot);
println!("supports_delta: {}", caps.supports_delta);
println!("has_sequence: {}", caps.has_sequence);
}
#[tokio::test]
#[ignore]
async fn test_subscribe_orderbook() {
let mut ws = match DydxWebSocket::new(false, AccountType::FuturesCross).await {
Ok(w) => w,
Err(e) => {
println!("Failed to create WebSocket: {:?}", e);
return;
}
};
let connect_result = timeout(
Duration::from_secs(10),
ws.connect(AccountType::FuturesCross),
)
.await;
match connect_result {
Ok(Ok(())) => {
assert_eq!(ws.connection_status(), ConnectionStatus::Connected);
let sub = SubscriptionRequest::new(btc_usd(), StreamType::Orderbook);
let result = ws.subscribe(sub).await;
if result.is_err() {
println!("Subscribe failed: {:?}", result.err());
let _ = ws.disconnect().await;
return;
}
println!("Subscribed to dYdX v4_orderbook (BTC-USD) — waiting for snapshot...");
let mut stream = ws.event_stream();
let ob_event = timeout(Duration::from_secs(15), async {
use crate::core::StreamEvent;
while let Some(event) = stream.next().await {
match event {
Ok(ev @ StreamEvent::OrderbookSnapshot(_))
| Ok(ev @ StreamEvent::OrderbookDelta(_)) => {
return ev;
}
Ok(other) => {
println!("Received non-orderbook event (skipping): {:?}", other);
continue;
}
Err(e) => panic!("Stream returned error: {:?}", e),
}
}
panic!("Stream ended without orderbook data");
})
.await
.expect(
"Timeout waiting for orderbook data — dYdX did not send snapshot/delta within 15s",
);
use crate::core::StreamEvent;
if let StreamEvent::OrderbookSnapshot(ob) = &ob_event {
assert!(!ob.bids.is_empty(), "Snapshot bids must not be empty");
assert!(!ob.asks.is_empty(), "Snapshot asks must not be empty");
let bid_prices: Vec<f64> = ob.bids.iter().map(|b| b.price).collect();
let sorted_desc = bid_prices.windows(2).all(|w| w[0] >= w[1]);
assert!(sorted_desc, "Bids must be sorted descending by price");
let ask_prices: Vec<f64> = ob.asks.iter().map(|a| a.price).collect();
let sorted_asc = ask_prices.windows(2).all(|w| w[0] <= w[1]);
assert!(sorted_asc, "Asks must be sorted ascending by price");
let best_bid = ob.bids[0].price;
let best_ask = ob.asks[0].price;
assert!(
best_bid < best_ask,
"Book must not be crossed: best_bid={} best_ask={}",
best_bid,
best_ask
);
println!(
"Orderbook snapshot OK: {} bids, {} asks, best_bid={}, best_ask={}",
ob.bids.len(),
ob.asks.len(),
best_bid,
best_ask
);
} else {
println!(
"Received orderbook delta (no snapshot assertions): {:?}",
ob_event
);
}
let _ = ws.disconnect().await;
println!("dYdX orderbook subscription works");
}
Ok(Err(e)) => println!("Connection failed: {:?}", e),
Err(_) => println!("Connection timeout"),
}
}
#[tokio::test]
#[ignore]
async fn test_subscribe_trades() {
let mut ws = match DydxWebSocket::new(false, AccountType::FuturesCross).await {
Ok(w) => w,
Err(e) => {
println!("Failed to create WebSocket: {:?}", e);
return;
}
};
let connect_result = timeout(
Duration::from_secs(10),
ws.connect(AccountType::FuturesCross),
)
.await;
match connect_result {
Ok(Ok(())) => {
let sub = SubscriptionRequest::new(btc_usd(), StreamType::Trade);
let result = ws.subscribe(sub).await;
if result.is_err() {
println!("Subscribe failed: {:?}", result.err());
let _ = ws.disconnect().await;
return;
}
println!("Subscribed to dYdX v4_trades (BTC-USD) — waiting for trade...");
let mut stream = ws.event_stream();
let event = timeout(Duration::from_secs(15), stream.next()).await;
if let Ok(Some(Ok(ev))) = event {
println!("Received trade event: {:?}", ev);
} else {
println!("No trade event received within timeout (market may be slow)");
}
let _ = ws.disconnect().await;
println!("dYdX trades subscription works");
}
Ok(Err(e)) => println!("Connection failed: {:?}", e),
Err(_) => println!("Connection timeout"),
}
}