quicknode-cascade 0.2.3

Stream blockchain data at scale. Plugin-based framework powered by QuickNode Cascade — start with Solana, more chains coming.
Documentation
//! Crash recovery test: runs backfill in two stages to verify cursor resume.
//!
//! Stage 1: Backfills slots 300_000_000..300_000_009 (10 slots)
//! Stage 2: Resumes from cursor — should skip already-processed slots
//!
//! Usage: cargo run --release --example crash_recovery_test

use quicknode_cascade::{CascadeRunner, solana};
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;

struct CountingPlugin {
    blocks: Arc<AtomicU64>,
    txs: Arc<AtomicU64>,
}

impl solana::Plugin for CountingPlugin {
    fn name(&self) -> &'static str { "counter" }

    fn on_block<'a>(&'a self, block: &'a solana::BlockData) -> solana::PluginFuture<'a> {
        Box::pin(async move {
            let n = self.blocks.fetch_add(1, Ordering::Relaxed) + 1;
            println!("[block {}] slot={} txs={}", n, block.slot, block.transaction_count);
            Ok(())
        })
    }

    fn on_transaction<'a>(&'a self, tx: &'a solana::TransactionData) -> solana::PluginFuture<'a> {
        Box::pin(async move {
            if !tx.is_vote {
                self.txs.fetch_add(1, Ordering::Relaxed);
            }
            Ok(())
        })
    }
}

fn main() {
    tracing_subscriber::fmt()
        .with_env_filter(
            tracing_subscriber::EnvFilter::try_from_default_env()
                .unwrap_or_else(|_| tracing_subscriber::EnvFilter::new("info")),
        )
        .with_writer(std::io::stderr)
        .init();

    let cursor_path = "/tmp/cascade_test_cursor.json";

    // --- STAGE 1: Backfill first 5 slots ---
    println!("\n=== STAGE 1: Backfill slots 300_000_000..300_000_004 ===\n");
    let blocks1 = Arc::new(AtomicU64::new(0));
    let txs1 = Arc::new(AtomicU64::new(0));

    CascadeRunner::solana_mainnet()
        .backfill(300_000_000, 300_000_004)
        .concurrency(5)
        .cursor_file(cursor_path)
        .with_plugin(Box::new(CountingPlugin {
            blocks: blocks1.clone(),
            txs: txs1.clone(),
        }))
        .run()
        .expect("stage 1 failed");

    let s1_blocks = blocks1.load(Ordering::Relaxed);
    let s1_txs = txs1.load(Ordering::Relaxed);
    println!("\nStage 1 result: {} blocks, {} non-vote txs", s1_blocks, s1_txs);

    // Read cursor
    let cursor_json = std::fs::read_to_string(cursor_path).expect("cursor file missing");
    println!("Cursor after stage 1: {}", cursor_json.trim());

    // --- STAGE 2: Backfill full range — should resume from cursor ---
    println!("\n=== STAGE 2: Backfill slots 300_000_000..300_000_009 (should resume from cursor) ===\n");
    let blocks2 = Arc::new(AtomicU64::new(0));
    let txs2 = Arc::new(AtomicU64::new(0));

    CascadeRunner::solana_mainnet()
        .backfill(300_000_000, 300_000_009)
        .concurrency(5)
        .cursor_file(cursor_path)
        .with_plugin(Box::new(CountingPlugin {
            blocks: blocks2.clone(),
            txs: txs2.clone(),
        }))
        .run()
        .expect("stage 2 failed");

    let s2_blocks = blocks2.load(Ordering::Relaxed);
    let s2_txs = txs2.load(Ordering::Relaxed);
    println!("\nStage 2 result: {} blocks, {} non-vote txs", s2_blocks, s2_txs);

    // Read final cursor
    let final_cursor = std::fs::read_to_string(cursor_path).expect("cursor file missing");
    println!("Cursor after stage 2: {}", final_cursor.trim());

    // --- VERIFY ---
    println!("\n=== VERIFICATION ===");
    assert!(s1_blocks == 5, "Stage 1 should process 5 blocks, got {}", s1_blocks);
    assert!(s2_blocks == 5, "Stage 2 should process 5 NEW blocks (resumed), got {}", s2_blocks);
    println!("PASS: Stage 1 processed {} blocks", s1_blocks);
    println!("PASS: Stage 2 processed {} blocks (no duplicates from resume)", s2_blocks);
    println!("PASS: Total = {} unique blocks across both stages", s1_blocks + s2_blocks);

    // Cleanup
    std::fs::remove_file(cursor_path).ok();
    println!("\nAll crash recovery checks passed.");
}