chaindexing 0.1.80

Index any EVM chain and query in SQL
Documentation
mod error;
mod filters;
mod ingest_events;
mod maybe_handle_chain_reorg;
mod provider;

pub use error::IngesterError;
pub use provider::{Provider, ProviderError};

use std::cmp::max;
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;

use chrono::Utc;
use futures_util::StreamExt;
use tokio::sync::Mutex;
use tokio::time::interval;

use crate::contracts;
use crate::nodes::NodeTask;
use crate::pruning::PruningConfig;
use crate::states;
use crate::streams::ContractAddressesStream;
use crate::Chain;
use crate::ChainId;
use crate::Config;
use crate::Contract;
use crate::ContractAddress;
use crate::{ChaindexingRepo, ChaindexingRepoClient, ChaindexingRepoConn};
use crate::{ExecutesWithRawQuery, HasRawQueryClient, Repo};

pub async fn start<S: Sync + Send + Clone + 'static>(config: &Config<S>) -> NodeTask {
    let node_task = NodeTask::new();

    for chains in get_chunked_chains(config) {
        let config = config.clone();

        node_task
            .add_subtask(tokio::spawn(async move {
                let mut interval = interval(Duration::from_millis(config.ingestion_rate_ms));
                let mut last_pruned_at_per_chain_id = HashMap::new();

                loop {
                    for chain in chains.iter() {
                        let provider = provider::get(&chain.json_rpc_url);
                        let repo_client = Arc::new(Mutex::new(config.repo.get_client().await));
                        let pool = config.repo.get_pool(1).await;
                        let conn = ChaindexingRepo::get_conn(&pool).await;
                        let conn = Arc::new(Mutex::new(conn));

                        ingest_for_chain(
                            &chain.id,
                            provider,
                            conn.clone(),
                            &repo_client,
                            &config,
                            &mut last_pruned_at_per_chain_id,
                        )
                        .await
                        .unwrap();
                    }

                    interval.tick().await;
                }
            }))
            .await;
    }

    node_task
}

pub fn get_chunked_chains<S: Send + Sync + Clone + 'static>(config: &Config<S>) -> Vec<Vec<Chain>> {
    let chains: Vec<_> = config.chains.clone();
    let chunk_size = max(chains.len() / config.chain_concurrency as usize, 1);

    chains.chunks(chunk_size).map(|c| c.to_vec()).collect()
}

pub async fn ingest_for_chain<'a, S: Send + Sync + Clone>(
    chain_id: &ChainId,
    provider: Arc<impl Provider>,
    conn: Arc<Mutex<ChaindexingRepoConn<'a>>>,
    repo_client: &Arc<Mutex<ChaindexingRepoClient>>,
    config @ Config {
        contracts,
        pruning_config,
        ..
    }: &Config<S>,
    last_pruned_at_per_chain_id: &mut HashMap<u64, u64>,
) -> Result<(), IngesterError> {
    let current_block_number = provider::fetch_current_block_number(&provider).await;
    let mut contract_addresses_stream =
        ContractAddressesStream::new(repo_client, *chain_id as i64).with_chunk_size(5);

    while let Some(contract_addresses) = contract_addresses_stream.next().await {
        let contract_addresses =
            filter_uningested_contract_addresses(&contract_addresses, current_block_number);

        let mut conn = conn.lock().await;
        let repo_client = &*repo_client.lock().await;

        ingest_events::run(
            &mut conn,
            repo_client,
            contract_addresses.clone(),
            &provider,
            chain_id,
            current_block_number,
            config,
        )
        .await?;

        maybe_handle_chain_reorg::run(
            &mut conn,
            contract_addresses,
            &provider,
            chain_id,
            current_block_number,
            config,
        )
        .await?;
    }

    maybe_prune(
        pruning_config,
        last_pruned_at_per_chain_id,
        contracts,
        *chain_id as u64,
        current_block_number,
        &*repo_client.lock().await,
    )
    .await;

    Ok(())
}

async fn maybe_prune<S: Send + Sync + Clone>(
    pruning_config: &Option<PruningConfig>,
    last_pruned_at_per_chain_id: &mut HashMap<u64, u64>,
    contracts: &[Contract<S>],
    chain_id: u64,
    current_block_number: u64,
    repo_client: &ChaindexingRepoClient,
) {
    if let Some(pruning_config @ PruningConfig { prune_interval, .. }) = pruning_config {
        let now = Utc::now().timestamp() as u64;
        let last_pruned_at = last_pruned_at_per_chain_id.get(&chain_id).unwrap_or(&now);
        if now - *last_pruned_at >= *prune_interval {
            let min_pruning_block_number =
                pruning_config.get_min_block_number(current_block_number);

            ChaindexingRepo::prune_events(repo_client, min_pruning_block_number, chain_id).await;

            let state_migrations = contracts::get_state_migrations(contracts);
            let state_table_names = states::get_all_table_names(&state_migrations);
            states::prune_state_versions(
                &state_table_names,
                repo_client,
                min_pruning_block_number,
                chain_id,
            )
            .await;
        }
        last_pruned_at_per_chain_id.insert(chain_id, Utc::now().timestamp() as u64);
    }
}

fn filter_uningested_contract_addresses(
    contract_addresses: &[ContractAddress],
    current_block_number: u64,
) -> Vec<ContractAddress> {
    contract_addresses
        .iter()
        .filter(|ca| current_block_number >= ca.next_block_number_to_ingest_from as u64)
        .cloned()
        .collect()
}