cherry-ingest 0.0.4

Library for ingesting evm data using common a query/response format
Documentation
use crate::{evm, DataStream, Format, StreamConfig};
use anyhow::{Context, Result};
use futures_lite::StreamExt;
use std::collections::BTreeMap;

use std::sync::Arc;

pub fn query_to_sqd(query: &evm::Query) -> Result<sqd_portal_client::evm::Query> {
    let hex_encode = |addr: &[u8]| format!("0x{}", faster_hex::hex_string(addr));

    let mut logs: Vec<_> = Vec::with_capacity(query.logs.len());

    for lg in query.logs.iter() {
        let mut topic0 = Vec::with_capacity(lg.topic0.len() + lg.event_signatures.len());

        topic0.extend_from_slice(lg.topic0.as_slice());

        for sig in lg.event_signatures.iter() {
            let t0 = cherry_evm_decode::signature_to_topic0(sig)
                .context("convert event signature to topic0")?;
            topic0.push(evm::Topic(t0));
        }

        let topic0 = topic0
            .into_iter()
            .map(|x| hex_encode(x.0.as_slice()))
            .collect::<Vec<_>>();

        logs.push(sqd_portal_client::evm::LogRequest {
            address: lg
                .address
                .iter()
                .map(|x| hex_encode(x.0.as_slice()))
                .collect(),
            topic0,
            topic1: lg
                .topic1
                .iter()
                .map(|x| hex_encode(x.0.as_slice()))
                .collect(),
            topic2: lg
                .topic2
                .iter()
                .map(|x| hex_encode(x.0.as_slice()))
                .collect(),
            topic3: lg
                .topic3
                .iter()
                .map(|x| hex_encode(x.0.as_slice()))
                .collect(),
            transaction: lg.include_transactions,
            transaction_logs: lg.include_transaction_logs,
            transaction_traces: lg.include_transaction_traces,
        });
    }

    Ok(sqd_portal_client::evm::Query {
        type_: Default::default(),
        from_block: query.from_block,
        to_block: query.to_block,
        include_all_blocks: query.include_all_blocks,
        transactions: query
            .transactions
            .iter()
            .map(|tx| sqd_portal_client::evm::TransactionRequest {
                from: tx
                    .from_
                    .iter()
                    .map(|x| hex_encode(x.0.as_slice()))
                    .collect(),
                to: tx.to.iter().map(|x| hex_encode(x.0.as_slice())).collect(),
                sighash: tx
                    .sighash
                    .iter()
                    .map(|x| hex_encode(x.0.as_slice()))
                    .collect(),
                logs: tx.include_logs,
                traces: tx.include_traces,
                state_diffs: false,
            })
            .collect(),
        logs,
        traces: query
            .traces
            .iter()
            .map(|t| sqd_portal_client::evm::TraceRequest {
                type_: t.type_.clone(),
                create_from: t.from_.iter().map(|x| hex_encode(x.0.as_slice())).collect(),
                call_from: t.from_.iter().map(|x| hex_encode(x.0.as_slice())).collect(),
                call_to: t.to.iter().map(|x| hex_encode(x.0.as_slice())).collect(),
                call_sighash: t
                    .sighash
                    .iter()
                    .map(|x| hex_encode(x.0.as_slice()))
                    .collect(),
                suicide_refund_address: t
                    .address
                    .iter()
                    .map(|x| hex_encode(x.0.as_slice()))
                    .collect(),
                reward_author: t
                    .author
                    .iter()
                    .map(|x| hex_encode(x.0.as_slice()))
                    .collect(),
                transaction: t.include_transactions,
                transaction_logs: t.include_transaction_logs,
                subtraces: t.include_transaction_traces,
                parents: t.include_transaction_traces,
            })
            .collect(),
        state_diffs: Vec::new(),
        fields: sqd_portal_client::evm::Fields {
            block: sqd_portal_client::evm::BlockFields {
                number: query.fields.block.number,
                hash: query.fields.block.hash,
                parent_hash: query.fields.block.parent_hash,
                timestamp: query.fields.block.timestamp,
                transactions_root: query.fields.block.transactions_root,
                receipts_root: query.fields.block.receipts_root,
                state_root: query.fields.block.state_root,
                logs_bloom: query.fields.block.logs_bloom,
                sha3_uncles: query.fields.block.sha3_uncles,
                extra_data: query.fields.block.extra_data,
                miner: query.fields.block.miner,
                nonce: query.fields.block.nonce,
                mix_hash: query.fields.block.mix_hash,
                size: query.fields.block.size,
                gas_limit: query.fields.block.gas_limit,
                gas_used: query.fields.block.gas_used,
                difficulty: query.fields.block.difficulty,
                total_difficulty: query.fields.block.total_difficulty,
                base_fee_per_gas: query.fields.block.base_fee_per_gas,
                blob_gas_used: query.fields.block.blob_gas_used,
                excess_blob_gas: query.fields.block.excess_blob_gas,
                l1_block_number: query.fields.block.l1_block_number,
            },
            transaction: sqd_portal_client::evm::TransactionFields {
                transaction_index: query.fields.transaction.transaction_index,
                hash: query.fields.transaction.hash,
                nonce: query.fields.transaction.nonce,
                from: query.fields.transaction.from_,
                to: query.fields.transaction.to,
                input: query.fields.transaction.input,
                value: query.fields.transaction.value,
                gas: query.fields.transaction.gas,
                gas_price: query.fields.transaction.gas_price,
                max_fee_per_gas: query.fields.transaction.max_fee_per_gas,
                max_priority_fee_per_gas: query.fields.transaction.max_priority_fee_per_gas,
                v: query.fields.transaction.v,
                r: query.fields.transaction.r,
                s: query.fields.transaction.s,
                y_parity: query.fields.transaction.y_parity,
                chain_id: query.fields.transaction.chain_id,
                sighash: query.fields.transaction.sighash,
                contract_address: query.fields.transaction.contract_address,
                gas_used: query.fields.transaction.gas_used,
                cumulative_gas_used: query.fields.transaction.cumulative_gas_used,
                effective_gas_price: query.fields.transaction.effective_gas_price,
                type_: query.fields.transaction.type_,
                status: query.fields.transaction.status,
                max_fee_per_blob_gas: query.fields.transaction.max_fee_per_blob_gas,
                blob_versioned_hashes: query.fields.transaction.blob_versioned_hashes,
                l1_fee: query.fields.transaction.l1_fee,
                l1_fee_scalar: query.fields.transaction.l1_fee_scalar,
                l1_gas_price: query.fields.transaction.l1_gas_price,
                l1_gas_used: false,
                l1_blob_base_fee: query.fields.transaction.l1_blob_base_fee,
                l1_blob_base_fee_scalar: query.fields.transaction.l1_blob_base_fee_scalar,
                l1_base_fee_scalar: query.fields.transaction.l1_base_fee_scalar,
            },
            log: sqd_portal_client::evm::LogFields {
                log_index: query.fields.log.log_index,
                transaction_index: query.fields.log.transaction_index,
                transaction_hash: query.fields.log.transaction_hash,
                address: query.fields.log.address,
                data: query.fields.log.data,
                topics: query.fields.log.topic0
                    || query.fields.log.topic1
                    || query.fields.log.topic2
                    || query.fields.log.topic3,
            },
            trace: sqd_portal_client::evm::TraceFields {
                transaction_index: query.fields.trace.transaction_position,
                trace_address: query.fields.trace.trace_address,
                subtraces: query.fields.trace.subtraces,
                type_: query.fields.trace.type_,
                error: query.fields.trace.error,
                revert_reason: query.fields.trace.error,
                create_from: query.fields.trace.from_,
                create_value: query.fields.trace.value,
                create_gas: query.fields.trace.gas,
                create_init: query.fields.trace.init,
                create_result_gas_used: query.fields.trace.gas_used,
                create_result_code: query.fields.trace.code,
                create_result_address: query.fields.trace.address,
                call_from: query.fields.trace.from_,
                call_to: query.fields.trace.to,
                call_value: query.fields.trace.value,
                call_gas: query.fields.trace.gas,
                call_input: query.fields.trace.input,
                call_sighash: query.fields.trace.sighash,
                call_type: query.fields.trace.type_,
                call_call_type: query.fields.trace.call_type,
                call_result_gas_used: query.fields.trace.gas_used,
                call_result_output: query.fields.trace.output,
                suicide_address: query.fields.trace.address,
                suicide_refund_address: query.fields.trace.refund_address,
                suicide_balance: query.fields.trace.balance,
                reward_author: query.fields.trace.author,
                reward_value: query.fields.trace.value,
                reward_type: query.fields.trace.author,
            },
        },
    })
}

pub fn start_stream(cfg: StreamConfig) -> Result<DataStream> {
    match cfg.format {
        Format::Evm(evm_query) => {
            let evm_query = query_to_sqd(&evm_query).context("convert to sqd query")?;

            let url = cfg
                .provider
                .url
                .context("url is required when using sqd")?
                .parse()
                .context("parse url")?;

            let mut client_config = sqd_portal_client::ClientConfig::default();

            if let Some(v) = cfg.provider.max_num_retries {
                client_config.max_num_retries = v;
            }
            if let Some(v) = cfg.provider.retry_backoff_ms {
                client_config.retry_backoff_ms = v;
            }
            if let Some(v) = cfg.provider.retry_base_ms {
                client_config.retry_base_ms = v;
            }
            if let Some(v) = cfg.provider.retry_ceiling_ms {
                client_config.retry_ceiling_ms = v;
            }
            if let Some(v) = cfg.provider.http_req_timeout_millis {
                client_config.http_req_timeout_millis = v;
            }

            let client = sqd_portal_client::Client::new(url, client_config);
            let client = Arc::new(client);

            let receiver = client.evm_arrow_finalized_stream(
                evm_query,
                sqd_portal_client::StreamConfig {
                    stop_on_head: true,
                    ..Default::default()
                },
            );

            let stream = tokio_stream::wrappers::ReceiverStream::new(receiver);

            let stream = stream.map(|v| {
                v.map(|v| {
                    let mut data = BTreeMap::new();

                    data.insert("blocks".to_owned(), v.blocks);
                    data.insert("transactions".to_owned(), v.transactions);
                    data.insert("logs".to_owned(), v.logs);
                    data.insert("traces".to_owned(), v.traces);

                    data
                })
            });

            Ok(Box::pin(stream))
        }
    }
}