use polymarket_sdk::prelude::*;
use std::time::Duration;
#[tokio::main]
async fn main() -> Result<()> {
tracing_subscriber::fmt::init();
println!("=== Polymarket WebSocket Streaming ===\n");
let asset_ids = vec![
"21742633143463906290569050155826241533067272736897614950488156847949938836455".to_string(),
];
println!("1. RTDS Stream Demo (trades and activity)...");
demo_rtds_stream().await?;
println!("\n2. CLOB WebSocket Demo (price updates)...");
demo_clob_stream(&asset_ids).await?;
println!("\n=== Done! ===");
Ok(())
}
async fn demo_rtds_stream() -> Result<()> {
println!("Creating RTDS client...");
let mut client = RtdsClient::with_defaults();
let mut event_rx = client.connect().await?;
println!("Connected to RTDS, listening for events...");
println!("(Will timeout after 10 seconds)\n");
let timeout = Duration::from_secs(10);
let start = tokio::time::Instant::now();
loop {
tokio::select! {
Some(event) = event_rx.recv() => {
match event {
RtdsEvent::Connected => {
println!("[RTDS] Connected to server");
}
RtdsEvent::Trade(trade) => {
if let Some(asset) = &trade.asset {
println!(
"[TRADE] {} | Outcome: {} | Price: ${:.2}",
&asset[..20.min(asset.len())],
trade.outcome.as_deref().unwrap_or("?"),
trade.price.unwrap_or(0.0)
);
}
}
RtdsEvent::Disconnected => {
println!("[RTDS] Disconnected");
break;
}
RtdsEvent::Error(e) => {
println!("[RTDS] Error: {}", e);
}
RtdsEvent::Reconnecting { attempt, backoff_secs } => {
println!("[RTDS] Reconnecting (attempt {}, backoff {}s)", attempt, backoff_secs);
}
RtdsEvent::RawMessage(msg) => {
println!("[RTDS] Message type: {}", msg.msg_type);
}
}
}
_ = tokio::time::sleep_until(start + timeout) => {
println!("[RTDS] Timeout reached");
break;
}
}
}
client.disconnect().await;
println!("RTDS demo complete");
Ok(())
}
async fn demo_clob_stream(asset_ids: &[String]) -> Result<()> {
println!("Creating CLOB WebSocket client...");
let mut client = WssMarketClient::new();
client.subscribe(asset_ids.to_vec()).await?;
println!(
"Subscribed to {} asset(s), listening for price updates...",
asset_ids.len()
);
println!("(Will timeout after 10 seconds)\n");
let timeout = Duration::from_secs(10);
let start = tokio::time::Instant::now();
loop {
if start.elapsed() > timeout {
println!("[WSS] Timeout reached");
break;
}
tokio::select! {
result = client.next_event() => {
match result {
Ok(WssMarketEvent::Book(book)) => {
println!(
"[BOOK] Market: {} | Bids: {} | Asks: {}",
&book.market[..20.min(book.market.len())],
book.bids.len(),
book.asks.len()
);
}
Ok(WssMarketEvent::PriceChange(msg)) => {
println!(
"[PRICE] Market: {} | {} change(s)",
&msg.market[..20.min(msg.market.len())],
msg.price_changes.len()
);
for change in msg.price_changes.iter().take(2) {
println!(
" {:?} @ {} (size: {})",
change.side,
change.price,
change.size
);
}
}
Ok(WssMarketEvent::LastTrade(trade)) => {
println!(
"[TRADE] Asset: {}... | Price: {}",
&trade.asset_id[..20.min(trade.asset_id.len())],
trade.price
);
}
Ok(WssMarketEvent::TickSizeChange(msg)) => {
println!(
"[TICK] Asset: {}... | {} -> {}",
&msg.asset_id[..20.min(msg.asset_id.len())],
msg.old_tick_size,
msg.new_tick_size
);
}
Err(e) => {
println!("[WSS] Error: {:?}", e);
break;
}
}
}
_ = tokio::time::sleep(Duration::from_millis(100)) => {
}
}
}
client.close().await;
println!("CLOB WebSocket demo complete");
Ok(())
}