use quicknode_cascade::{CascadeRunner, solana};
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
struct CountingPlugin {
blocks: Arc<AtomicU64>,
txs: Arc<AtomicU64>,
}
impl solana::Plugin for CountingPlugin {
fn name(&self) -> &'static str { "counter" }
fn on_block<'a>(&'a self, block: &'a solana::BlockData) -> solana::PluginFuture<'a> {
Box::pin(async move {
let n = self.blocks.fetch_add(1, Ordering::Relaxed) + 1;
println!("[block {}] slot={} txs={}", n, block.slot, block.transaction_count);
Ok(())
})
}
fn on_transaction<'a>(&'a self, tx: &'a solana::TransactionData) -> solana::PluginFuture<'a> {
Box::pin(async move {
if !tx.is_vote {
self.txs.fetch_add(1, Ordering::Relaxed);
}
Ok(())
})
}
}
fn main() {
tracing_subscriber::fmt()
.with_env_filter(
tracing_subscriber::EnvFilter::try_from_default_env()
.unwrap_or_else(|_| tracing_subscriber::EnvFilter::new("info")),
)
.with_writer(std::io::stderr)
.init();
let cursor_path = "/tmp/cascade_test_cursor.json";
println!("\n=== STAGE 1: Backfill slots 300_000_000..300_000_004 ===\n");
let blocks1 = Arc::new(AtomicU64::new(0));
let txs1 = Arc::new(AtomicU64::new(0));
CascadeRunner::solana_mainnet()
.backfill(300_000_000, 300_000_004)
.concurrency(5)
.cursor_file(cursor_path)
.with_plugin(Box::new(CountingPlugin {
blocks: blocks1.clone(),
txs: txs1.clone(),
}))
.run()
.expect("stage 1 failed");
let s1_blocks = blocks1.load(Ordering::Relaxed);
let s1_txs = txs1.load(Ordering::Relaxed);
println!("\nStage 1 result: {} blocks, {} non-vote txs", s1_blocks, s1_txs);
let cursor_json = std::fs::read_to_string(cursor_path).expect("cursor file missing");
println!("Cursor after stage 1: {}", cursor_json.trim());
println!("\n=== STAGE 2: Backfill slots 300_000_000..300_000_009 (should resume from cursor) ===\n");
let blocks2 = Arc::new(AtomicU64::new(0));
let txs2 = Arc::new(AtomicU64::new(0));
CascadeRunner::solana_mainnet()
.backfill(300_000_000, 300_000_009)
.concurrency(5)
.cursor_file(cursor_path)
.with_plugin(Box::new(CountingPlugin {
blocks: blocks2.clone(),
txs: txs2.clone(),
}))
.run()
.expect("stage 2 failed");
let s2_blocks = blocks2.load(Ordering::Relaxed);
let s2_txs = txs2.load(Ordering::Relaxed);
println!("\nStage 2 result: {} blocks, {} non-vote txs", s2_blocks, s2_txs);
let final_cursor = std::fs::read_to_string(cursor_path).expect("cursor file missing");
println!("Cursor after stage 2: {}", final_cursor.trim());
println!("\n=== VERIFICATION ===");
assert!(s1_blocks == 5, "Stage 1 should process 5 blocks, got {}", s1_blocks);
assert!(s2_blocks == 5, "Stage 2 should process 5 NEW blocks (resumed), got {}", s2_blocks);
println!("PASS: Stage 1 processed {} blocks", s1_blocks);
println!("PASS: Stage 2 processed {} blocks (no duplicates from resume)", s2_blocks);
println!("PASS: Total = {} unique blocks across both stages", s1_blocks + s2_blocks);
std::fs::remove_file(cursor_path).ok();
println!("\nAll crash recovery checks passed.");
}