use deribit_websocket::config::WebSocketConfig;
use deribit_websocket::prelude::*;
use std::sync::{Arc, Mutex};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
deribit_websocket::install_default_crypto_provider()?;
unsafe {
std::env::set_var("DERIBIT_LOG_LEVEL", "DEBUG");
}
setup_logger();
tracing::info!("🚀 Starting Real-time Trades Subscription Example");
let trade_count = Arc::new(Mutex::new(0u32));
let total_volume = Arc::new(Mutex::new(0.0f64));
let trade_count_clone = trade_count.clone();
let volume_clone = total_volume.clone();
let config = WebSocketConfig::default();
let mut client = DeribitWebSocketClient::new(&config)?;
client.set_message_handler(
move |message: &str| -> Result<(), WebSocketError> {
if let Ok(json_msg) = serde_json::from_str::<Value>(message)
&& let Some(method) = json_msg.get("method")
&& method.as_str() == Some("subscription")
&& let Some(params) = json_msg.get("params")
&& let Some(channel) = params.get("channel").and_then(|c| c.as_str())
&& channel.starts_with("trades.")
&& let Some(data) = params.get("data")
{
if let Some(trades) = data.as_array() {
for trade in trades {
let mut count = trade_count_clone.lock().unwrap();
let mut volume = volume_clone.lock().unwrap();
*count += 1;
let price = trade.get("price").and_then(|p| p.as_f64()).unwrap_or(0.0);
let amount = trade.get("amount").and_then(|a| a.as_f64()).unwrap_or(0.0);
let direction = trade
.get("direction")
.and_then(|d| d.as_str())
.unwrap_or("unknown");
let timestamp =
trade.get("timestamp").and_then(|t| t.as_u64()).unwrap_or(0);
let trade_id = trade
.get("trade_id")
.and_then(|id| id.as_str())
.unwrap_or("unknown");
*volume += amount;
tracing::info!(
"💰 Trade #{}: {} {} @ {} (ID: {}, Time: {})",
*count,
direction,
amount,
price,
trade_id,
timestamp
);
}
} else if let Some(trade) = data.as_object() {
let mut count = trade_count_clone.lock().unwrap();
let mut volume = volume_clone.lock().unwrap();
*count += 1;
let price = trade.get("price").and_then(|p| p.as_f64()).unwrap_or(0.0);
let amount = trade.get("amount").and_then(|a| a.as_f64()).unwrap_or(0.0);
let direction = trade
.get("direction")
.and_then(|d| d.as_str())
.unwrap_or("unknown");
*volume += amount;
tracing::info!("💰 Trade #{}: {} {} @ {}", *count, direction, amount, price);
}
}
Ok(())
},
|message: &str, error: &WebSocketError| {
tracing::error!("❌ Error processing trade message: {}", error);
tracing::error!(
" Message preview: {}",
if message.len() > 100 {
format!("{}...", &message[..100])
} else {
message.to_string()
}
);
},
);
tracing::info!("🔌 Connecting to Deribit WebSocket...");
client.connect().await?;
tracing::info!("✅ Connected successfully");
tracing::info!("📊 Subscribing to real-time trades...");
let channels = vec![
"trades.BTC-PERPETUAL.raw".to_string(),
"trades.ETH-PERPETUAL.raw".to_string(),
"trades.BTC-29MAR24.raw".to_string(), ];
match client.subscribe(channels).await {
Ok(_) => tracing::info!("✅ Successfully subscribed to trades channels"),
Err(e) => tracing::error!("❌ Subscription failed: {}", e),
}
tracing::info!("🎯 Listening for real-time trades (15 seconds)...");
tracing::info!(" - Each trade will show direction, amount, and price");
tracing::info!(" - Volume statistics will be tracked");
let processing_task = tokio::spawn(async move { client.start_message_processing_loop().await });
tokio::time::sleep(std::time::Duration::from_secs(15)).await;
processing_task.abort();
let final_trades = *trade_count.lock().unwrap();
let final_volume = *total_volume.lock().unwrap();
tracing::info!("\n📊 Trade Statistics:");
tracing::info!(" 💰 Total trades processed: {}", final_trades);
tracing::info!(" 📈 Total volume: {:.4}", final_volume);
if final_trades > 0 {
tracing::info!(
" 📊 Average trade size: {:.4}",
final_volume / final_trades as f64
);
}
if final_trades == 0 {
tracing::info!("\n💡 Tips for trade data:");
tracing::info!(" - Trade data depends on market activity");
tracing::info!(" - Try during active trading hours");
tracing::info!(" - BTC-PERPETUAL usually has the most activity");
}
tracing::info!("\n🎉 Real-time trades example completed!");
Ok(())
}