quicknode-cascade 0.2.3

Stream blockchain data at scale. Plugin-based framework powered by QuickNode Cascade — start with Solana, more chains coming.
Documentation
//! Concurrency correctness test — proves identical results at low and high parallelism.
//!
//! Runs the same 10-slot range at concurrency=1 (sequential) and concurrency=50 (parallel).
//! Collects (slot, transaction_count) pairs from both runs and asserts they match exactly.
//! This proves the parallel fetch + sort + ordered dispatch path is correct.
//!
//!   cargo run --release --example concurrency_test

use quicknode_cascade::{CascadeRunner, solana};
use std::sync::{Arc, Mutex};

struct Collector {
    results: Arc<Mutex<Vec<(u64, u64)>>>,
}

impl solana::Plugin for Collector {
    fn name(&self) -> &'static str {
        "collector"
    }

    fn on_block<'a>(&'a self, block: &'a solana::BlockData) -> solana::PluginFuture<'a> {
        Box::pin(async move {
            self.results
                .lock()
                .unwrap()
                .push((block.slot, block.transaction_count));
            Ok(())
        })
    }
}

fn run_with_concurrency(concurrency: usize) -> Vec<(u64, u64)> {
    let results = Arc::new(Mutex::new(Vec::new()));
    let cursor = format!("/tmp/cascade_concurrency_test_{}.cursor", concurrency);
    std::fs::remove_file(&cursor).ok();

    CascadeRunner::solana_mainnet()
        .backfill(300_000_000, 300_000_009)
        .concurrency(concurrency)
        .cursor_file(&cursor)
        .with_plugin(Box::new(Collector {
            results: results.clone(),
        }))
        .run()
        .expect("backfill failed");

    std::fs::remove_file(&cursor).ok();
    let mut data = results.lock().unwrap().clone();
    data.sort_by_key(|(slot, _)| *slot);
    data
}

fn main() {
    tracing_subscriber::fmt()
        .with_env_filter(
            tracing_subscriber::EnvFilter::try_from_default_env()
                .unwrap_or_else(|_| tracing_subscriber::EnvFilter::new("info")),
        )
        .with_writer(std::io::stderr)
        .init();

    println!("=== Concurrency Correctness Test ===\n");

    println!("--- Run 1: concurrency=1 (sequential) ---");
    let low = run_with_concurrency(1);
    println!("  Collected {} blocks\n", low.len());

    println!("--- Run 2: concurrency=50 (parallel) ---");
    let high = run_with_concurrency(50);
    println!("  Collected {} blocks\n", high.len());

    println!("=== Verification ===\n");

    assert_eq!(
        low.len(),
        high.len(),
        "Block count mismatch: concurrency=1 got {}, concurrency=50 got {}",
        low.len(),
        high.len()
    );
    println!("PASS: Same block count ({})", low.len());

    for (a, b) in low.iter().zip(high.iter()) {
        assert_eq!(
            a.0, b.0,
            "Slot order mismatch: concurrency=1 has slot {}, concurrency=50 has slot {}",
            a.0, b.0
        );
        assert_eq!(
            a.1, b.1,
            "Transaction count mismatch at slot {}: concurrency=1 has {}, concurrency=50 has {}",
            a.0, a.1, b.1
        );
    }
    println!("PASS: Identical slot ordering");
    println!("PASS: Identical transaction counts per slot");

    println!("\nSlots processed:");
    for (slot, tx_count) in &low {
        println!("  slot={} txs={}", slot, tx_count);
    }

    println!("\nAll concurrency checks passed.");
}