chaindexing 0.1.80

Index any EVM chain and query in SQL
Documentation
use std::collections::HashMap;
use std::sync::Arc;

use futures_util::FutureExt;

use super::filters::{self, Filter};
use super::provider::{self, Provider};
use super::IngesterError;

use crate::chain_reorg::Execution;
use crate::Config;
use crate::{events, ChainId};
use crate::{
    ChaindexingRepo, ChaindexingRepoClient, ChaindexingRepoConn, ContractAddress,
    LoadsDataWithRawQuery, Repo,
};

pub async fn run<'a, S: Send + Sync + Clone>(
    conn: &mut ChaindexingRepoConn<'a>,
    repo_client: &ChaindexingRepoClient,
    contract_addresses: Vec<ContractAddress>,
    provider: &Arc<impl Provider>,
    chain_id: &ChainId,
    current_block_number: u64,
    Config {
        contracts,
        blocks_per_batch,
        ..
    }: &Config<S>,
) -> Result<(), IngesterError> {
    let filters = filters::get(
        &contract_addresses,
        contracts,
        current_block_number,
        *blocks_per_batch,
        &Execution::Main,
    );

    let filters = remove_already_ingested_filters(&filters, &contract_addresses, repo_client).await;

    if !filters.is_empty() {
        let logs = provider::fetch_logs(provider, &filters).await;
        let blocks_by_tx_hash = provider::fetch_blocks_by_number(provider, &logs).await;
        let events = events::get(
            &logs,
            contracts,
            &contract_addresses,
            chain_id,
            &blocks_by_tx_hash,
        );
        let contract_addresses = contract_addresses.clone();

        ChaindexingRepo::run_in_transaction(conn, move |conn| {
            async move {
                ChaindexingRepo::create_events(conn, &events.clone()).await;

                update_next_block_numbers_to_ingest_from(conn, &contract_addresses, &filters).await;

                Ok(())
            }
            .boxed()
        })
        .await?;
    }

    Ok(())
}

async fn remove_already_ingested_filters(
    filters: &Vec<Filter>,
    contract_addresses: &[ContractAddress],
    repo_client: &ChaindexingRepoClient,
) -> Vec<Filter> {
    let current_block_filters: Vec<_> = filters
        .iter()
        .filter(|f| f.value.get_from_block() == f.value.get_to_block())
        .collect();

    if current_block_filters.is_empty() {
        filters.to_owned()
    } else {
        let addresses: Vec<_> = contract_addresses.iter().map(|c| c.address.clone()).collect();

        let latest_ingested_events =
            ChaindexingRepo::load_latest_events(repo_client, &addresses).await;
        let latest_ingested_events =
            latest_ingested_events
                .iter()
                .fold(HashMap::new(), |mut events_by_address, event| {
                    events_by_address.insert(&event.contract_address, event);

                    events_by_address
                });

        let already_ingested_filters = current_block_filters
            .iter()
            .filter(|filter| match latest_ingested_events.get(&filter.address) {
                Some(latest_event) => {
                    latest_event.block_number as u64
                        == filter.value.get_to_block().unwrap().as_u64()
                }
                None => false,
            })
            .fold(HashMap::new(), |mut stale_current_block_filters, filter| {
                stale_current_block_filters.insert(filter.contract_address_id, filter);

                stale_current_block_filters
            });

        filters
            .iter()
            .filter(|f| !already_ingested_filters.contains_key(&f.contract_address_id))
            .cloned()
            .collect::<Vec<_>>()
    }
}

async fn update_next_block_numbers_to_ingest_from<'a>(
    conn: &mut ChaindexingRepoConn<'a>,
    contract_addresses: &[ContractAddress],
    filters: &[Filter],
) {
    let filters_by_contract_address_id = filters::group_by_contract_address_id(filters);

    for (contract_address, filters) in contract_addresses
        .iter()
        .filter_map(|ca| filters_by_contract_address_id.get(&ca.id).map(|f| (ca, f)))
    {
        if let Some(latest_filter) = filters::get_latest(filters) {
            let next_block_number_to_ingest_from = latest_filter.value.get_to_block().unwrap() + 1;

            ChaindexingRepo::update_next_block_number_to_ingest_from(
                conn,
                contract_address,
                next_block_number_to_ingest_from.as_u64() as i64,
            )
            .await
        }
    }
}