quicknode-cascade 0.2.3

Stream blockchain data at scale. Plugin-based framework powered by QuickNode Cascade — start with Solana, more chains coming.
Documentation
//! Custom extraction — parse the raw JSON yourself.
//!
//! Uses `on_raw` to extract inner instructions and program invocation
//! counts — data the built-in pipeline doesn't provide.
//!
//!   cargo run --release --example custom_extraction

use quicknode_cascade::{CascadeRunner, solana};

struct ProgramCounter;

impl solana::Plugin for ProgramCounter {
    fn name(&self) -> &'static str {
        "program-counter"
    }

    fn on_raw<'a>(&'a self, slot: u64, raw: &'a serde_json::Value) -> solana::PluginFuture<'a> {
        Box::pin(async move {
            let txs = raw.get("transactions").and_then(|v| v.as_array());
            let tx_count = txs.map_or(0, |t| t.len());

            let mut total_instructions = 0u64;
            let mut total_inner_instructions = 0u64;
            let mut programs = std::collections::HashMap::<&str, u64>::new();

            if let Some(txs) = txs {
                for tx in txs {
                    let instructions = tx
                        .get("transaction")
                        .and_then(|t| t.get("message"))
                        .and_then(|m| m.get("instructions"))
                        .and_then(|i| i.as_array());

                    let account_keys = tx
                        .get("transaction")
                        .and_then(|t| t.get("message"))
                        .and_then(|m| m.get("accountKeys"))
                        .and_then(|a| a.as_array());

                    if let (Some(ixs), Some(keys)) = (instructions, account_keys) {
                        for ix in ixs {
                            total_instructions += 1;
                            let pid_idx = ix.get("programIdIndex").and_then(|v| v.as_u64());
                            if let Some(idx) = pid_idx {
                                if let Some(key) = keys.get(idx as usize).and_then(|k| k.as_str()) {
                                    *programs.entry(key).or_insert(0) += 1;
                                }
                            }
                        }
                    }

                    let inner = tx
                        .get("meta")
                        .and_then(|m| m.get("innerInstructions"))
                        .and_then(|i| i.as_array());

                    if let Some(inner_groups) = inner {
                        for group in inner_groups {
                            if let Some(ixs) = group.get("instructions").and_then(|i| i.as_array()) {
                                total_inner_instructions += ixs.len() as u64;
                            }
                        }
                    }
                }
            }

            let mut top_programs: Vec<_> = programs.into_iter().collect();
            top_programs.sort_by(|a, b| b.1.cmp(&a.1));

            println!(
                "slot={} txs={} instructions={} inner_instructions={}",
                slot, tx_count, total_instructions, total_inner_instructions
            );
            for (program, count) in top_programs.iter().take(5) {
                println!("  {}{} calls", program, count);
            }

            Ok(())
        })
    }
}

fn main() {
    tracing_subscriber::fmt()
        .with_env_filter(
            tracing_subscriber::EnvFilter::try_from_default_env()
                .unwrap_or_else(|_| tracing_subscriber::EnvFilter::new("info")),
        )
        .with_writer(std::io::stderr)
        .init();

    let cursor = "/tmp/cascade_custom_extraction.cursor";
    std::fs::remove_file(cursor).ok();

    CascadeRunner::solana_mainnet()
        .backfill(300_000_000, 300_000_002)
        .concurrency(3)
        .cursor_file(cursor)
        .with_plugin(Box::new(ProgramCounter))
        .run()
        .expect("done");
}