cherry-ingest 0.0.3

Library for ingesting evm data using common a query/response format
Documentation
use std::{collections::BTreeMap, pin::Pin, sync::Arc};

use anyhow::{Context, Result};
use arrow::record_batch::RecordBatch;
use futures_lite::{Stream, StreamExt};
use reqwest::Url;

pub mod evm;

#[derive(Debug, Clone)]
pub struct StreamConfig {
    pub format: Format,
    pub provider: Provider,
}

#[derive(Debug, Clone)]
pub enum Format {
    Evm(evm::Query),
}

#[derive(Debug, Clone)]
pub enum Provider {
    Sqd {
        client_config: sqd_portal_client::ClientConfig,
        url: Url,
    },
}

#[allow(clippy::type_complexity)]
pub fn start_stream(
    cfg: StreamConfig,
) -> Result<Pin<Box<dyn Stream<Item = Result<BTreeMap<String, RecordBatch>>> + Send + Sync>>> {
    match cfg.provider {
        Provider::Sqd { client_config, url } => match cfg.format {
            Format::Evm(evm_query) => {
                let evm_query = evm_query.to_sqd().context("convert to sqd query")?;

                let client = sqd_portal_client::Client::new(url, client_config);
                let client = Arc::new(client);

                let receiver = client.evm_arrow_finalized_stream(
                    evm_query,
                    sqd_portal_client::StreamConfig {
                        stop_on_head: true,
                        ..Default::default()
                    },
                );

                let stream = tokio_stream::wrappers::ReceiverStream::new(receiver);

                let stream = stream.map(|v| {
                    v.map(|v| {
                        let mut data = BTreeMap::new();

                        data.insert("blocks".to_owned(), v.blocks);
                        data.insert("transactions".to_owned(), v.transactions);
                        data.insert("logs".to_owned(), v.logs);
                        data.insert("traces".to_owned(), v.traces);

                        data
                    })
                });

                Ok(Box::pin(stream))
            }
        },
    }
}