use quicknode_cascade::{CascadeRunner, solana};
use std::sync::{Arc, Mutex};
struct Collector {
results: Arc<Mutex<Vec<(u64, u64)>>>,
}
impl solana::Plugin for Collector {
fn name(&self) -> &'static str {
"collector"
}
fn on_block<'a>(&'a self, block: &'a solana::BlockData) -> solana::PluginFuture<'a> {
Box::pin(async move {
self.results
.lock()
.unwrap()
.push((block.slot, block.transaction_count));
Ok(())
})
}
}
fn run_with_concurrency(concurrency: usize) -> Vec<(u64, u64)> {
let results = Arc::new(Mutex::new(Vec::new()));
let cursor = format!("/tmp/cascade_concurrency_test_{}.cursor", concurrency);
std::fs::remove_file(&cursor).ok();
CascadeRunner::solana_mainnet()
.backfill(300_000_000, 300_000_009)
.concurrency(concurrency)
.cursor_file(&cursor)
.with_plugin(Box::new(Collector {
results: results.clone(),
}))
.run()
.expect("backfill failed");
std::fs::remove_file(&cursor).ok();
let mut data = results.lock().unwrap().clone();
data.sort_by_key(|(slot, _)| *slot);
data
}
fn main() {
tracing_subscriber::fmt()
.with_env_filter(
tracing_subscriber::EnvFilter::try_from_default_env()
.unwrap_or_else(|_| tracing_subscriber::EnvFilter::new("info")),
)
.with_writer(std::io::stderr)
.init();
println!("=== Concurrency Correctness Test ===\n");
println!("--- Run 1: concurrency=1 (sequential) ---");
let low = run_with_concurrency(1);
println!(" Collected {} blocks\n", low.len());
println!("--- Run 2: concurrency=50 (parallel) ---");
let high = run_with_concurrency(50);
println!(" Collected {} blocks\n", high.len());
println!("=== Verification ===\n");
assert_eq!(
low.len(),
high.len(),
"Block count mismatch: concurrency=1 got {}, concurrency=50 got {}",
low.len(),
high.len()
);
println!("PASS: Same block count ({})", low.len());
for (a, b) in low.iter().zip(high.iter()) {
assert_eq!(
a.0, b.0,
"Slot order mismatch: concurrency=1 has slot {}, concurrency=50 has slot {}",
a.0, b.0
);
assert_eq!(
a.1, b.1,
"Transaction count mismatch at slot {}: concurrency=1 has {}, concurrency=50 has {}",
a.0, a.1, b.1
);
}
println!("PASS: Identical slot ordering");
println!("PASS: Identical transaction counts per slot");
println!("\nSlots processed:");
for (slot, tx_count) in &low {
println!(" slot={} txs={}", slot, tx_count);
}
println!("\nAll concurrency checks passed.");
}