use deribit_websocket::prelude::*;
use serde_json::Value;
use std::sync::{Arc, Mutex};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
deribit_websocket::install_default_crypto_provider()?;
unsafe {
std::env::set_var("DERIBIT_LOG_LEVEL", "DEBUG");
}
setup_logger();
tracing::info!("🚀 Starting Deribit WebSocket Callback Example");
let processed_count = Arc::new(Mutex::new(0u32));
let error_count = Arc::new(Mutex::new(0u32));
let processed_count_clone = processed_count.clone();
let error_count_clone = error_count.clone();
setup_logger();
let mut client = DeribitWebSocketClient::default();
client.set_message_handler(
move |message: &str| -> Result<(), WebSocketError> {
match serde_json::from_str::<Value>(message) {
Ok(json) => {
let mut count = processed_count_clone.lock().unwrap();
*count += 1;
let msg_type = json
.get("method")
.or_else(|| json.get("result"))
.map(|v| v.to_string())
.unwrap_or_else(|| "unknown".to_string());
tracing::info!("📨 Processing message #{}: {}", *count, msg_type);
if (*count).is_multiple_of(5) {
tracing::warn!("⚠️ Simulating processing error for message #{}", *count);
return Err(WebSocketError::InvalidMessage(format!(
"Simulated processing error for message #{}",
*count
)));
}
Ok(())
}
Err(e) => {
tracing::error!("❌ Failed to parse JSON: {}", e);
Err(WebSocketError::InvalidMessage(format!(
"Failed to parse JSON: {}",
e
)))
}
}
},
move |message: &str, error: &WebSocketError| {
tracing::error!("🔥 Error callback triggered!");
let mut count = error_count_clone.lock().unwrap();
*count += 1;
let preview = if message.len() > 100 {
format!("{}...", &message[..100])
} else {
message.to_string()
};
tracing::error!(" Message preview: {}", preview);
tracing::error!(" Error: {}", error);
},
);
tracing::info!("🔌 Connecting to Deribit WebSocket...");
client.connect().await?;
tracing::info!("✅ Client created successfully");
tracing::info!("📡 Subscribing to market data channels...");
let channels = vec![
"ticker.BTC-PERPETUAL".to_string(),
"book.BTC-PERPETUAL.100ms".to_string(),
];
match client.subscribe(channels).await {
Ok(_) => tracing::info!("✅ Subscribed to channels"),
Err(e) => tracing::error!("❌ Subscription failed: {}", e),
}
tracing::info!("🛑 Stopping message processing...");
tracing::info!(" - Messages will be processed by the primary callback");
tracing::info!(" - Errors will be handled by the error callback");
tracing::info!("⏳ Processing messages for 15 seconds...");
let processing_task = tokio::spawn(async move { client.start_message_processing_loop().await });
tokio::time::sleep(std::time::Duration::from_secs(10)).await;
processing_task.abort();
let final_processed = *processed_count.lock().unwrap();
let final_errors = *error_count.lock().unwrap();
tracing::info!("📊 Final Statistics:");
tracing::info!(" 💚 Successfully processed: {} messages", final_processed);
tracing::info!(" 🔴 Errors encountered: {} messages", final_errors);
tracing::info!(
" 📈 Total messages handled: {}",
final_processed + final_errors
);
tracing::info!("🎉 Callback example completed successfully!");
Ok(())
}