chaindexing 0.1.80

Index any EVM chain and query in SQL
Documentation
use std::collections::HashSet;
use std::sync::Arc;

use futures_util::FutureExt;
use std::cmp::min;

use crate::chain_reorg::{Execution, UnsavedReorgedBlock};
use crate::events::{self, Event};
use crate::Config;
use crate::{ChainId, ChaindexingRepo, ChaindexingRepoConn, ContractAddress, Repo};

use super::filters::{self, Filter};
use super::Provider;
use super::{provider, IngesterError};

pub async fn run<'a, S: Send + Sync + Clone>(
    conn: &mut ChaindexingRepoConn<'a>,
    contract_addresses: Vec<ContractAddress>,
    provider: &Arc<impl Provider>,
    chain_id: &ChainId,
    current_block_number: u64,
    Config {
        contracts,
        min_confirmation_count,
        blocks_per_batch,
        ..
    }: &Config<S>,
) -> Result<(), IngesterError> {
    let filters = filters::get(
        &contract_addresses,
        contracts,
        current_block_number,
        *blocks_per_batch,
        &Execution::Confirmation(min_confirmation_count),
    );

    if !filters.is_empty() {
        let already_ingested_events = get_already_ingested_events(conn, &filters).await;
        let logs = provider::fetch_logs(provider, &filters).await;
        let blocks_by_number = provider::fetch_blocks_by_number(provider, &logs).await;

        let provider_events = events::get(
            &logs,
            contracts,
            &contract_addresses,
            chain_id,
            &blocks_by_number,
        );

        if let Some(added_and_removed_events) =
            get_provider_added_and_removed_events(&already_ingested_events, &provider_events)
        {
            handle_chain_reorg(conn, chain_id, added_and_removed_events).await?;
        }
    }

    Ok(())
}

async fn get_already_ingested_events<'a>(
    conn: &mut ChaindexingRepoConn<'a>,
    filters: &Vec<Filter>,
) -> Vec<Event> {
    let mut already_ingested_events = vec![];
    for filter in filters {
        let from_block = filter.value.get_from_block().unwrap().as_u64();
        let to_block = filter.value.get_to_block().unwrap().as_u64();

        let mut events =
            ChaindexingRepo::get_events(conn, filter.address.to_owned(), from_block, to_block)
                .await;
        already_ingested_events.append(&mut events);
    }

    already_ingested_events
}

async fn handle_chain_reorg<'a>(
    conn: &mut ChaindexingRepoConn<'a>,
    chain_id: &ChainId,
    (added_events, removed_events): (Vec<Event>, Vec<Event>),
) -> Result<(), IngesterError> {
    let earliest_block_number = get_earliest_block_number((&added_events, &removed_events));
    let new_reorged_block = UnsavedReorgedBlock::new(earliest_block_number, chain_id);

    ChaindexingRepo::run_in_transaction(conn, move |conn| {
        async move {
            ChaindexingRepo::create_reorged_block(conn, &new_reorged_block).await;

            let event_ids: Vec<_> = removed_events.iter().map(|e| e.id).collect();
            ChaindexingRepo::delete_events_by_ids(conn, &event_ids).await;

            ChaindexingRepo::create_events(conn, &added_events).await;

            Ok(())
        }
        .boxed()
    })
    .await?;

    Ok(())
}

fn get_provider_added_and_removed_events(
    already_ingested_events: &[Event],
    provider_events: &[Event],
) -> Option<(Vec<Event>, Vec<Event>)> {
    let already_ingested_events_set: HashSet<_> = already_ingested_events.iter().cloned().collect();
    let provider_events_set: HashSet<_> = provider_events.iter().cloned().collect();

    let added_events: Vec<_> = provider_events
        .iter()
        .filter(|e| !already_ingested_events_set.contains(e))
        .cloned()
        .collect();

    let removed_events: Vec<_> = already_ingested_events
        .iter()
        .filter(|e| !provider_events_set.contains(e))
        .cloned()
        .collect();

    if added_events.is_empty() && removed_events.is_empty() {
        None
    } else {
        Some((added_events, removed_events))
    }
}

fn get_earliest_block_number((added_events, removed_events): (&Vec<Event>, &Vec<Event>)) -> i64 {
    let earliest_added_event = added_events.iter().min_by_key(|e| e.block_number);
    let earliest_removed_event = removed_events.iter().min_by_key(|e| e.block_number);

    match (earliest_added_event, earliest_removed_event) {
        (Some(event), None) | (None, Some(event)) => event.block_number,
        (Some(earliest_added), Some(earliest_removed)) => {
            min(earliest_added.block_number, earliest_removed.block_number)
        }
        _ => unreachable!("Added Events or Removed Events must have at least one entry"),
    }
}