use std::time::Duration;
use tokio::time::timeout;
use super::connector::PolymarketConnector;
use super::websocket::{ClobWebSocket, WsEvent};
async fn find_active_token_id() -> Option<String> {
let connector = PolymarketConnector::public();
let markets = connector.get_gamma_markets(Some(50)).await.ok()?;
for market in markets.iter() {
if !market.is_tradeable() {
continue;
}
let token_id = market.yes_token_id()?;
match connector.get_order_book(token_id).await {
Ok(book) if !book.bids.is_empty() || !book.asks.is_empty() => {
let question = market.question.as_deref().unwrap_or("Unknown");
println!(
"Found active market: token_id={} question=\"{}\" bids={} asks={}",
token_id,
&question.chars().take(60).collect::<String>(),
book.bids.len(),
book.asks.len()
);
return Some(token_id.to_string());
}
Ok(_) => {
continue;
}
Err(_) => {
continue;
}
}
}
None
}
#[tokio::test]
#[ignore]
async fn test_orderbook_capabilities() {
use crate::core::AccountType;
let ws = ClobWebSocket::new(vec!["placeholder".to_string()], false);
let caps = ws.orderbook_capabilities(AccountType::Spot);
println!("Polymarket orderbook capabilities: {:?}", caps);
assert!(caps.supports_snapshot, "Polymarket must support full book snapshots");
assert!(caps.supports_delta, "Polymarket must support incremental price_change deltas");
assert!(!caps.has_sequence, "Polymarket does not use sequence numbers");
println!("supports_snapshot: {}", caps.supports_snapshot);
println!("supports_delta: {}", caps.supports_delta);
println!("has_sequence: {}", caps.has_sequence);
}
#[tokio::test]
#[ignore]
async fn test_connect() {
let token_id = match timeout(Duration::from_secs(30), find_active_token_id()).await {
Ok(Some(id)) => id,
Ok(None) => {
println!("No active token_id found — skipping test");
return;
}
Err(_) => {
println!("Timeout finding active token_id — skipping test");
return;
}
};
let mut ws = ClobWebSocket::new(vec![token_id.clone()], false);
let connect_result = timeout(Duration::from_secs(10), ws.connect()).await;
match connect_result {
Ok(Ok(())) => {
assert!(ws.is_connected(), "WebSocket must be connected after connect()");
println!("Polymarket ClobWebSocket connected successfully (token_id={})", token_id);
ws.close().await;
}
Ok(Err(e)) => println!("Connection failed: {:?}", e),
Err(_) => println!("Connection timeout"),
}
}
#[tokio::test]
#[ignore]
async fn test_subscribe_orderbook() {
let token_id = match timeout(Duration::from_secs(30), find_active_token_id()).await {
Ok(Some(id)) => id,
Ok(None) => {
println!("No active token_id found — skipping test");
return;
}
Err(_) => {
println!("Timeout finding active token_id — skipping test");
return;
}
};
let mut ws = ClobWebSocket::new(vec![token_id.clone()], false);
let connect_result = timeout(Duration::from_secs(10), ws.connect()).await;
match connect_result {
Ok(Ok(())) => {
println!(
"Connected to Polymarket WS, subscribed to token_id={} — waiting for book snapshot...",
token_id
);
let book_event = timeout(Duration::from_secs(15), async {
loop {
match ws.recv().await {
Ok(Some(WsEvent::Book(snapshot))) => return Some(snapshot),
Ok(Some(WsEvent::PriceChange(_))) => {
return None;
}
Ok(Some(WsEvent::Pong)) => continue,
Ok(Some(WsEvent::Unknown(u))) => {
if u.raw != "[]" {
println!("Unknown event (skipping): {}", u.raw);
}
continue;
}
Ok(Some(other)) => {
println!("Received non-book event (skipping): {:?}", other);
continue;
}
Ok(None) => {
println!("WebSocket closed gracefully");
return None;
}
Err(e) => {
println!("WebSocket error: {:?}", e);
return None;
}
}
}
})
.await;
match book_event {
Ok(Some(snapshot)) => {
println!(
"Book snapshot OK: {} bids, {} asks, asset_id={}",
snapshot.bids.len(),
snapshot.asks.len(),
snapshot.asset_id.as_deref().unwrap_or("unknown")
);
for bid in snapshot.bids.iter().take(3) {
let price: f64 = bid.price.parse().unwrap_or(-1.0);
assert!(
price >= 0.0 && price <= 1.0,
"Bid price must be a probability 0.0-1.0, got: {}",
bid.price
);
}
println!("Polymarket orderbook subscription works");
}
Ok(None) => println!("No book snapshot received — market may be empty"),
Err(_) => println!("Timeout waiting for book snapshot"),
}
ws.close().await;
}
Ok(Err(e)) => println!("Connection failed: {:?}", e),
Err(_) => println!("Connection timeout"),
}
}
#[tokio::test]
#[ignore]
async fn test_subscribe_trades() {
let token_id = match timeout(Duration::from_secs(30), find_active_token_id()).await {
Ok(Some(id)) => id,
Ok(None) => {
println!("No active token_id found — skipping test");
return;
}
Err(_) => {
println!("Timeout finding active token_id — skipping test");
return;
}
};
let mut ws = ClobWebSocket::new(vec![token_id.clone()], false);
let connect_result = timeout(Duration::from_secs(10), ws.connect()).await;
match connect_result {
Ok(Ok(())) => {
println!(
"Connected to Polymarket WS — waiting for last_trade_price event (token_id={})...",
token_id
);
let trade_event = timeout(Duration::from_secs(20), async {
loop {
match ws.recv().await {
Ok(Some(WsEvent::LastTradePrice(trade))) => return Some(trade),
Ok(Some(WsEvent::Pong)) => continue,
Ok(Some(WsEvent::Book(_))) => {
println!("Received book snapshot — still waiting for trade...");
continue;
}
Ok(Some(WsEvent::Unknown(u))) => {
if u.raw != "[]" {
println!("Unknown event (skipping): {}", u.raw);
}
continue;
}
Ok(Some(other)) => {
println!("Received other event (skipping): {:?}", other);
continue;
}
Ok(None) => return None,
Err(e) => {
println!("WebSocket error: {:?}", e);
return None;
}
}
}
})
.await;
match trade_event {
Ok(Some(trade)) => {
println!(
"LastTradePrice received: price={} asset_id={}",
trade.price, trade.asset_id.as_deref().unwrap_or("unknown")
);
println!("Polymarket trade subscription works");
}
Ok(None) => {
println!("No trade received within timeout (market may be inactive — this is expected for illiquid markets)");
}
Err(_) => {
println!("Timeout — no trade event within 20s (market may be inactive)");
}
}
ws.close().await;
}
Ok(Err(e)) => println!("Connection failed: {:?}", e),
Err(_) => println!("Connection timeout"),
}
}