quicknode-cascade 0.2.3

Stream blockchain data at scale. Plugin-based framework powered by QuickNode Cascade — start with Solana, more chains coming.
Documentation
//! Structured data extraction from Solana JSON-RPC `getBlock` responses.
//!
//! Parses block metadata, transactions, token balance changes, and
//! account SOL balance changes into typed records for plugin dispatch.

use super::types::{
    AccountActivityData, BlockData, SlotExtract, TokenTransferData, TransactionData,
};

const VOTE_PROGRAM: &str = "Vote111111111111111111111111111111";

/// Extract structured records from a getBlock JSON-RPC result.
/// Takes ownership of the JSON Value to avoid deep-cloning the raw block data.
pub(crate) fn extract_block(slot: u64, mut block: serde_json::Value) -> SlotExtract {
    let block_time = block.get("blockTime").and_then(|v| v.as_i64());
    let block_height = block.get("blockHeight").and_then(|v| v.as_u64());
    let blockhash = block
        .get("blockhash")
        .and_then(|v| v.as_str())
        .unwrap_or("")
        .to_string();
    let parent_blockhash = block
        .get("previousBlockhash")
        .and_then(|v| v.as_str())
        .unwrap_or("")
        .to_string();
    let parent_slot = block
        .get("parentSlot")
        .and_then(|v| v.as_u64())
        .unwrap_or(0);

    // Remove transactions array from block JSON to take ownership. The block
    // Value (minus transactions) becomes BlockData::raw; each transaction Value
    // is moved into TransactionData::raw — zero deep copies.
    let txs_vec: Vec<serde_json::Value> = block
        .as_object_mut()
        .and_then(|o| o.remove("transactions"))
        .and_then(|v| match v {
            serde_json::Value::Array(a) => Some(a),
            _ => None,
        })
        .unwrap_or_default();

    let tx_count = txs_vec.len();

    let block_data = BlockData {
        slot,
        blockhash,
        parent_slot,
        parent_blockhash,
        block_time,
        block_height,
        transaction_count: tx_count as u64,
        raw: block,
    };

    let mut transactions = Vec::with_capacity(tx_count);
    let mut token_transfers = Vec::with_capacity(tx_count);
    let mut account_activity = Vec::with_capacity(tx_count * 4);

    for (idx, tx_val) in txs_vec.into_iter().enumerate() {
        let signature = extract_first_signature(tx_val.get("transaction"));
        let fee = tx_val
            .get("meta")
            .and_then(|m| m.get("fee"))
            .and_then(|v| v.as_u64())
            .unwrap_or(0);
        let compute_units = tx_val
            .get("meta")
            .and_then(|m| m.get("computeUnitsConsumed"))
            .and_then(|v| v.as_u64());
        let success = tx_val
            .get("meta")
            .and_then(|m| m.get("err"))
            .map_or(true, |err| err.is_null());
        let log_messages = tx_val
            .get("meta")
            .and_then(|m| m.get("logMessages"))
            .and_then(|v| v.as_array())
            .map(|arr| {
                arr.iter()
                    .filter_map(|s| s.as_str().map(|s| s.to_string()))
                    .collect()
            });
        let pre_balances = extract_u64_array(
            tx_val.get("meta").and_then(|m| m.get("preBalances")),
        );
        let post_balances = extract_u64_array(
            tx_val.get("meta").and_then(|m| m.get("postBalances")),
        );
        let is_vote = detect_vote(tx_val.get("transaction"));

        extract_token_transfers(
            slot,
            idx as u32,
            &signature,
            tx_val.get("meta"),
            block_time,
            &mut token_transfers,
        );

        extract_accounts(
            slot,
            idx as u32,
            &signature,
            tx_val.get("transaction"),
            &pre_balances,
            &post_balances,
            block_time,
            &mut account_activity,
        );

        transactions.push(TransactionData {
            slot,
            tx_index: idx as u32,
            signature,
            success,
            fee,
            compute_units_consumed: compute_units,
            is_vote,
            pre_balances,
            post_balances,
            log_messages,
            block_time,
            raw: tx_val,
        });
    }

    SlotExtract {
        block: block_data,
        transactions,
        token_transfers,
        account_activity,
    }
}

fn extract_first_signature(transaction: Option<&serde_json::Value>) -> String {
    transaction
        .and_then(|t| t.get("signatures"))
        .and_then(|s| s.as_array())
        .and_then(|a| a.first())
        .and_then(|v| v.as_str())
        .unwrap_or("")
        .to_string()
}

fn extract_u64_array(val: Option<&serde_json::Value>) -> Vec<u64> {
    match val.and_then(|v| v.as_array()) {
        Some(arr) => {
            let mut result = Vec::with_capacity(arr.len());
            for v in arr {
                if let Some(n) = v.as_u64() {
                    result.push(n);
                }
            }
            result
        }
        None => Vec::new(),
    }
}

fn detect_vote(transaction: Option<&serde_json::Value>) -> bool {
    let account_keys = transaction
        .and_then(|t| t.get("message"))
        .and_then(|m| m.get("accountKeys"))
        .and_then(|a| a.as_array());

    if let Some(keys) = account_keys {
        for key in keys {
            if key.as_str() == Some(VOTE_PROGRAM) {
                return true;
            }
        }
    }
    false
}

fn extract_token_transfers(
    slot: u64,
    tx_index: u32,
    signature: &str,
    meta: Option<&serde_json::Value>,
    block_time: Option<i64>,
    out: &mut Vec<TokenTransferData>,
) {
    let meta = match meta {
        Some(m) => m,
        None => return,
    };

    let pre_tokens = meta.get("preTokenBalances").and_then(|v| v.as_array());
    let post_tokens = meta.get("postTokenBalances").and_then(|v| v.as_array());

    let pre_map = build_token_balance_map(pre_tokens);
    let post_map = build_token_balance_map(post_tokens);

    let mut all_keys: Vec<_> = pre_map.keys().chain(post_map.keys()).cloned().collect();
    all_keys.sort();
    all_keys.dedup();

    for key in all_keys {
        let pre = pre_map.get(&key);
        let post = post_map.get(&key);

        let (mint, owner, decimals) = match (pre, post) {
            (Some(p), _) => (p.mint.clone(), p.owner.clone(), p.decimals),
            (_, Some(p)) => (p.mint.clone(), p.owner.clone(), p.decimals),
            _ => continue,
        };

        let pre_amount = pre
            .map(|p| p.amount.clone())
            .unwrap_or_else(|| "0".to_string());
        let post_amount = post
            .map(|p| p.amount.clone())
            .unwrap_or_else(|| "0".to_string());

        if pre_amount == post_amount {
            continue;
        }

        out.push(TokenTransferData {
            slot,
            tx_index,
            signature: signature.to_string(),
            mint,
            owner,
            pre_amount,
            post_amount,
            decimals,
            block_time,
        });
    }
}

struct TokenBalanceEntry {
    mint: String,
    owner: String,
    amount: String,
    decimals: u8,
}

fn build_token_balance_map(
    balances: Option<&Vec<serde_json::Value>>,
) -> std::collections::HashMap<(u32, String), TokenBalanceEntry> {
    let capacity = balances.map_or(0, |b| b.len());
    let mut map = std::collections::HashMap::with_capacity(capacity);
    if let Some(bals) = balances {
        for bal in bals {
            let account_index =
                bal.get("accountIndex").and_then(|v| v.as_u64()).unwrap_or(0) as u32;
            let mint = bal
                .get("mint")
                .and_then(|v| v.as_str())
                .unwrap_or("")
                .to_string();
            let owner = bal
                .get("owner")
                .and_then(|v| v.as_str())
                .unwrap_or("")
                .to_string();
            let amount = bal
                .get("uiTokenAmount")
                .and_then(|u| u.get("amount"))
                .and_then(|v| v.as_str())
                .unwrap_or("0")
                .to_string();
            let decimals = bal
                .get("uiTokenAmount")
                .and_then(|u| u.get("decimals"))
                .and_then(|v| v.as_u64())
                .unwrap_or(0) as u8;

            map.insert(
                (account_index, mint.clone()),
                TokenBalanceEntry {
                    mint,
                    owner,
                    amount,
                    decimals,
                },
            );
        }
    }
    map
}

fn extract_accounts(
    slot: u64,
    tx_index: u32,
    signature: &str,
    transaction: Option<&serde_json::Value>,
    pre_balances: &[u64],
    post_balances: &[u64],
    block_time: Option<i64>,
    out: &mut Vec<AccountActivityData>,
) {
    let account_keys = transaction
        .and_then(|t| t.get("message"))
        .and_then(|m| m.get("accountKeys"))
        .and_then(|a| a.as_array());

    let keys = match account_keys {
        Some(k) => k,
        None => return,
    };

    let header = transaction
        .and_then(|t| t.get("message"))
        .and_then(|m| m.get("header"));

    let num_required_sigs = header
        .and_then(|h| h.get("numRequiredSignatures"))
        .and_then(|v| v.as_u64())
        .unwrap_or(0) as usize;

    for (i, key) in keys.iter().enumerate() {
        let account = match key.as_str() {
            Some(s) => s.to_string(),
            None => continue,
        };

        let pre = pre_balances.get(i).copied().unwrap_or(0);
        let post = post_balances.get(i).copied().unwrap_or(0);
        let change = post as i64 - pre as i64;

        out.push(AccountActivityData {
            slot,
            tx_index,
            signature: signature.to_string(),
            account,
            pre_balance: pre,
            post_balance: post,
            balance_change: change,
            is_signer: i < num_required_sigs,
            is_fee_payer: i == 0,
            block_time,
        });
    }
}