quicknode-cascade 0.2.3

Stream blockchain data at scale. Plugin-based framework powered by QuickNode Cascade — start with Solana, more chains coming.
Documentation
//! CascadeRunner — the builder-pattern engine that wires everything together.
//!
//! Chain-agnostic runner that handles parallel fetching, retry-forever logic,
//! plugin dispatch, cursor management, and graceful shutdown.

use anyhow::Result;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;

use crate::solana::Plugin;
use crate::source;

enum RunMode {
    Backfill { start: u64, end: u64 },
    Live { start_slot: Option<u64> },
}

/// Builder-pattern runner for the streaming engine.
///
/// Handles all parallel fetching, retry-forever logic, plugin dispatch,
/// cursor management, and graceful shutdown.
///
/// # Example
///
/// ```rust,no_run
/// use quicknode_cascade::{CascadeRunner, solana};
///
/// struct Logger;
/// impl solana::Plugin for Logger {
///     fn name(&self) -> &'static str { "logger" }
///     fn on_block<'a>(&'a self, block: &'a solana::BlockData) -> solana::PluginFuture<'a> {
///         Box::pin(async move {
///             println!("slot {}", block.slot);
///             Ok(())
///         })
///     }
/// }
///
/// CascadeRunner::solana_mainnet()
///     .auth_token("your-jwt-token")
///     .backfill(400_000_000, 400_000_010)
///     .with_plugin(Box::new(Logger))
///     .run()
///     .expect("done");
/// ```
pub struct CascadeRunner {
    source: String,
    auth_token: Option<String>,
    plugins: Vec<Box<dyn Plugin>>,
    concurrency: usize,
    encoding: String,
    mode: RunMode,
    cursor_file: String,
    tip_buffer: u64,
}

impl CascadeRunner {
    /// Create a runner for Solana mainnet via QuickNode Cascade.
    pub fn solana_mainnet() -> Self {
        Self::chain("solana-mainnet")
    }

    /// Create a runner for Solana devnet via QuickNode Cascade.
    pub fn solana_devnet() -> Self {
        Self::chain("solana-devnet")
    }

    /// Create a runner for a named chain.
    ///
    /// The chain name maps to the Cascade endpoint:
    /// `https://{chain}-cascade.quiknode.io`
    ///
    /// # Examples
    ///
    /// ```rust
    /// use quicknode_cascade::CascadeRunner;
    ///
    /// // These are equivalent:
    /// let a = CascadeRunner::solana_mainnet();
    /// let b = CascadeRunner::chain("solana-mainnet");
    /// ```
    pub fn chain(name: &str) -> Self {
        let source = format!("https://{}-cascade.quiknode.io", name);
        Self {
            source,
            auth_token: None,
            plugins: Vec::new(),
            concurrency: 10,
            encoding: "json".to_string(),
            mode: RunMode::Live { start_slot: None },
            cursor_file: "cursor.json".to_string(),
            tip_buffer: 100,
        }
    }

    /// Set the authentication token (JWT) for the Cascade endpoint.
    ///
    /// The token is sent as `Authorization: Bearer <token>` on every request.
    pub fn auth_token(mut self, token: &str) -> Self {
        self.auth_token = Some(token.to_string());
        self
    }

    /// Override the source URL directly (for custom RPC endpoints).
    ///
    /// Use this when pointing at your own Solana validator or a non-Cascade RPC.
    pub fn source_url(mut self, url: &str) -> Self {
        self.source = url.to_string();
        self
    }

    /// Register a plugin. Multiple plugins can be registered; each sees all events.
    pub fn with_plugin(mut self, plugin: Box<dyn Plugin>) -> Self {
        self.plugins.push(plugin);
        self
    }

    /// Set parallel fetch concurrency (default: 10).
    pub fn concurrency(mut self, n: usize) -> Self {
        self.concurrency = n;
        self
    }

    /// Set the encoding for fetching blocks (default: "json").
    ///
    /// `on_raw` fires for ALL encodings with the raw JSON.
    ///
    /// In `"json"` mode, the runner additionally extracts structured data and calls
    /// `on_block`/`on_transaction`/`on_token_transfer`/`on_account_activity`.
    ///
    /// In any other mode (`"jsonParsed"`, `"base64"`, etc.), only `on_raw` fires.
    pub fn encoding(mut self, encoding: &str) -> Self {
        self.encoding = encoding.to_string();
        self
    }

    /// Set the runner to backfill a specific slot range.
    pub fn backfill(mut self, start: u64, end: u64) -> Self {
        self.mode = RunMode::Backfill { start, end };
        self
    }

    /// Set the runner to follow the chain tip in real-time.
    pub fn live(mut self) -> Self {
        self.mode = RunMode::Live { start_slot: None };
        self
    }

    /// Set the runner to follow the chain tip, starting from a specific slot.
    pub fn live_from(mut self, slot: u64) -> Self {
        self.mode = RunMode::Live {
            start_slot: Some(slot),
        };
        self
    }

    /// Set the cursor file path for resume support (default: "cursor.json").
    pub fn cursor_file(mut self, path: &str) -> Self {
        self.cursor_file = path.to_string();
        self
    }

    /// Set the tip buffer for live mode (default: 100 slots).
    ///
    /// Keeps the client N slots behind the chain tip to ensure data
    /// availability on Cascade. Set to 0 for direct RPC endpoints.
    pub fn tip_buffer(mut self, slots: u64) -> Self {
        self.tip_buffer = slots;
        self
    }

    /// Run the engine. Blocks until completion (backfill) or shutdown signal (live).
    ///
    /// Creates a tokio runtime internally. If called from within an async
    /// context, use [`run_async`](Self::run_async) instead.
    pub fn run(self) -> Result<()> {
        let rt = tokio::runtime::Builder::new_multi_thread()
            .enable_all()
            .build()?;
        rt.block_on(self.run_async())
    }

    /// Async version of [`run`](Self::run).
    ///
    /// Use this from within an existing tokio runtime.
    pub async fn run_async(self) -> Result<()> {
        if self.plugins.is_empty() {
            return Err(anyhow::anyhow!(
                "No plugins registered. Call .with_plugin() before .run()"
            ));
        }
        if self.concurrency == 0 {
            return Err(anyhow::anyhow!("Concurrency must be at least 1"));
        }

        let plugins: Vec<Arc<dyn Plugin>> =
            self.plugins.into_iter().map(Arc::from).collect();
        let plugins = Arc::new(plugins);

        let shutdown = Arc::new(AtomicBool::new(false));
        let shutdown_clone = shutdown.clone();
        tokio::spawn(async move {
            tokio::signal::ctrl_c().await.ok();
            tracing::info!("Shutdown signal received");
            shutdown_clone.store(true, Ordering::SeqCst);
        });

        for (i, plugin) in plugins.iter().enumerate() {
            tracing::info!("Loading plugin: {}", plugin.name());
            if let Err(e) = plugin.on_load().await {
                for loaded in plugins[..i].iter() {
                    loaded.on_exit().await.ok();
                }
                return Err(anyhow::anyhow!(
                    "Plugin '{}' on_load failed: {}",
                    plugin.name(),
                    e
                ));
            }
        }

        let result = match self.mode {
            RunMode::Backfill { start, end } => {
                source::backfill::run_backfill(
                    &self.source,
                    self.auth_token.as_deref(),
                    plugins.clone(),
                    &self.encoding,
                    start,
                    end,
                    self.concurrency,
                    &self.cursor_file,
                    shutdown,
                )
                .await
            }
            RunMode::Live { start_slot } => {
                source::live::run_live(
                    &self.source,
                    self.auth_token.as_deref(),
                    plugins.clone(),
                    &self.encoding,
                    start_slot,
                    &self.cursor_file,
                    shutdown,
                    self.tip_buffer,
                    self.concurrency,
                )
                .await
            }
        };

        for plugin in plugins.iter() {
            if let Err(e) = plugin.on_exit().await {
                tracing::warn!("Plugin '{}' on_exit error: {}", plugin.name(), e);
            }
        }

        result
    }
}