use std::time::Duration;
use futures_util::{SinkExt, StreamExt};
use tokio::time::timeout;
use tokio_tungstenite::{connect_async, tungstenite::Message};
#[tokio::main]
async fn main() {
let url = "wss://ws.bitstamp.net";
eprintln!("[bitstamp_trade_capture] connecting to {}", url);
let (ws_stream, _) = connect_async(url).await.expect("WS connect failed");
eprintln!("[bitstamp_trade_capture] connected");
let (mut writer, mut reader) = ws_stream.split();
let sub = r#"{"event":"bts:subscribe","data":{"channel":"live_trades_btcusd"}}"#;
eprintln!("[bitstamp_trade_capture] sending: {}", sub);
writer
.send(Message::Text(sub.to_string()))
.await
.expect("send failed");
let mut frame_count = 0usize;
let mut trade_count = 0usize;
let mut other_count = 0usize;
let capture_secs = 120u64;
eprintln!(
"[bitstamp_trade_capture] capturing {} seconds...",
capture_secs
);
let result = timeout(Duration::from_secs(capture_secs), async {
while let Some(msg) = reader.next().await {
match msg {
Ok(Message::Text(text)) => {
frame_count += 1;
let parsed: serde_json::Value =
serde_json::from_str(&text).unwrap_or(serde_json::Value::Null);
let event = parsed
.get("event")
.and_then(|v| v.as_str())
.unwrap_or("<none>");
let channel = parsed
.get("channel")
.and_then(|v| v.as_str())
.unwrap_or("<none>");
if event == "trade" {
trade_count += 1;
if trade_count <= 5 {
eprintln!(
"[TRADE frame #{}] event={} channel={} raw={}",
trade_count, event, channel, text
);
}
let data_field = parsed.get("data");
if trade_count == 1 {
eprintln!(
"[TRADE data field type] is_string={} is_object={} raw_data={:?}",
data_field.map(|v| v.is_string()).unwrap_or(false),
data_field.map(|v| v.is_object()).unwrap_or(false),
data_field
);
}
} else {
other_count += 1;
eprintln!(
"[OTHER frame #{}] event={} channel={} raw={}",
other_count, event, channel, &text[..text.len().min(300)]
);
}
}
Ok(Message::Ping(d)) => {
eprintln!("[bitstamp_trade_capture] recv Ping, sending Pong");
let _ = writer.send(Message::Pong(d)).await;
}
Ok(Message::Close(_)) => {
eprintln!("[bitstamp_trade_capture] server closed connection");
break;
}
Ok(_) => {}
Err(e) => {
eprintln!("[bitstamp_trade_capture] WS error: {}", e);
break;
}
}
}
})
.await;
if result.is_err() {
eprintln!(
"[bitstamp_trade_capture] {} second window elapsed",
capture_secs
);
}
println!("=== SUMMARY ===");
println!("total_frames: {}", frame_count);
println!("trade_frames: {}", trade_count);
println!("other_frames: {}", other_count);
if trade_count == 0 {
println!("DIAGNOSIS: zero trade frames — check subscribe ack and channel name");
} else {
println!("DIAGNOSIS: trades flowing OK");
}
}