use sol_parser_sdk::core::now_micros;
use sol_parser_sdk::shredstream::{ShredStreamClient, ShredStreamConfig};
use sol_parser_sdk::DexEvent;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
fn update_min_max(min: &Arc<AtomicU64>, max: &Arc<AtomicU64>, value: u64) {
let mut current_min = min.load(Ordering::Relaxed);
while value < current_min {
match min.compare_exchange(current_min, value, Ordering::Relaxed, Ordering::Relaxed) {
Ok(_) => break,
Err(x) => current_min = x,
}
}
let mut current_max = max.load(Ordering::Relaxed);
while value > current_max {
match max.compare_exchange(current_max, value, Ordering::Relaxed, Ordering::Relaxed) {
Ok(_) => break,
Err(x) => current_max = x,
}
}
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
println!("🚀 ShredStream Low-Latency Test");
println!("================================\n");
run_example().await
}
async fn run_example() -> Result<(), Box<dyn std::error::Error>> {
let config = ShredStreamConfig {
connection_timeout_ms: 5000,
request_timeout_ms: 30000,
max_decoding_message_size: 1024 * 1024 * 1024, reconnect_delay_ms: 1000,
max_reconnect_attempts: 0, };
println!("📋 Configuration:");
println!(" Endpoint: http://127.0.0.1:10800");
println!(" Reconnect: infinite");
println!();
let client = ShredStreamClient::new_with_config("http://127.0.0.1:10800", config).await?;
println!("✅ ShredStream client connected");
println!("🎧 Starting subscription...\n");
let queue = client.subscribe().await?;
let event_count = Arc::new(AtomicU64::new(0));
let total_latency = Arc::new(AtomicU64::new(0));
let min_latency = Arc::new(AtomicU64::new(u64::MAX));
let max_latency = Arc::new(AtomicU64::new(0));
let stats_count = event_count.clone();
let stats_total = total_latency.clone();
let stats_min = min_latency.clone();
let stats_max = max_latency.clone();
let queue_for_stats = queue.clone();
tokio::spawn(async move {
let mut last_count = 0u64;
loop {
tokio::time::sleep(std::time::Duration::from_secs(10)).await;
let count = stats_count.load(Ordering::Relaxed);
let total = stats_total.load(Ordering::Relaxed);
let min = stats_min.load(Ordering::Relaxed);
let max = stats_max.load(Ordering::Relaxed);
let queue_len = queue_for_stats.len();
if count > 0 {
let avg = total / count;
let events_per_sec = (count - last_count) as f64 / 10.0;
println!("\n╔════════════════════════════════════════════════════╗");
println!("║ 性能统计 (10秒间隔) ║");
println!("╠════════════════════════════════════════════════════╣");
println!("║ 事件总数: {:>10} ║", count);
println!("║ 事件速率: {:>10.1} events/sec ║", events_per_sec);
println!("║ 队列长度: {:>10} ║", queue_len);
println!("║ 平均延迟: {:>10} μs ║", avg);
println!(
"║ 最小延迟: {:>10} μs ║",
if min == u64::MAX { 0 } else { min }
);
println!("║ 最大延迟: {:>10} μs ║", max);
println!("╚════════════════════════════════════════════════════╝\n");
if queue_len > 1000 {
println!("⚠️ 警告: 队列堆积 ({}), 消费速度 < 生产速度", queue_len);
}
}
last_count = count;
}
});
let consumer_event_count = event_count.clone();
let consumer_total_latency = total_latency.clone();
let consumer_min_latency = min_latency.clone();
let consumer_max_latency = max_latency.clone();
tokio::spawn(async move {
let mut spin_count = 0u32;
loop {
if let Some(event) = queue.pop() {
spin_count = 0;
let queue_recv_us = now_micros();
let grpc_recv_us_opt = event.metadata().map(|m| m.grpc_recv_us);
if let Some(grpc_recv_us) = grpc_recv_us_opt {
let latency_us = (queue_recv_us - grpc_recv_us) as u64;
consumer_event_count.fetch_add(1, Ordering::Relaxed);
consumer_total_latency.fetch_add(latency_us, Ordering::Relaxed);
update_min_max(&consumer_min_latency, &consumer_max_latency, latency_us);
println!("\n================================================");
println!("ShredStream接收时间: {} μs", grpc_recv_us);
println!("事件接收时间: {} μs", queue_recv_us);
println!("延迟时间: {} μs", latency_us);
println!("队列长度: {}", queue.len());
println!("================================================");
println!("{:?}", event);
println!();
}
} else {
spin_count += 1;
if spin_count < 1000 {
std::hint::spin_loop();
} else {
tokio::task::yield_now().await;
spin_count = 0;
}
}
}
});
let client_clone = client.clone();
tokio::spawn(async move {
tokio::time::sleep(std::time::Duration::from_secs(600)).await;
println!("⏰ Auto-stopping after 10 minutes...");
client_clone.stop().await;
});
println!("🛑 Press Ctrl+C to stop...\n");
tokio::signal::ctrl_c().await?;
println!("\n👋 Shutting down gracefully...");
client.stop().await;
Ok(())
}