chaindexing 0.1.80

Index any EVM chain and query in SQL
Documentation
use tokio_postgres::{types::ToSql, Client, NoTls, Transaction};

use crate::chain_reorg::ReorgedBlock;
use crate::events::PartialEvent;
use crate::nodes::Node;
use crate::{root, Event, UnsavedContractAddress};
use crate::{ExecutesWithRawQuery, HasRawQueryClient, LoadsDataWithRawQuery, PostgresRepo};
use serde::de::DeserializeOwned;

pub type PostgresRepoClient = Client;
pub type PostgresRepoTxnClient<'a> = Transaction<'a>;

#[crate::augmenting_std::async_trait]
impl HasRawQueryClient for PostgresRepo {
    type RawQueryClient = Client;
    type RawQueryTxnClient<'a> = Transaction<'a>;

    async fn get_client(&self) -> Self::RawQueryClient {
        let (client, conn) = tokio_postgres::connect(&self.url, NoTls).await.unwrap();

        tokio::spawn(async move { conn.await.map_err(|e| eprintln!("connection error: {e}")) });

        client
    }
    async fn get_txn_client<'a>(
        client: &'a mut Self::RawQueryClient,
    ) -> Self::RawQueryTxnClient<'a> {
        client.transaction().await.unwrap()
    }
}

#[crate::augmenting_std::async_trait]
impl ExecutesWithRawQuery for PostgresRepo {
    async fn execute(client: &Self::RawQueryClient, query: &str) {
        client.execute(query, &[] as &[&(dyn ToSql + Sync)]).await.unwrap();
    }
    async fn execute_in_txn<'a>(txn_client: &Self::RawQueryTxnClient<'a>, query: &str) {
        txn_client.execute(query, &[] as &[&(dyn ToSql + Sync)]).await.unwrap();
    }
    async fn commit_txns<'a>(client: Self::RawQueryTxnClient<'a>) {
        client.commit().await.unwrap();
    }

    async fn create_contract_addresses(
        client: &Self::RawQueryClient,
        contract_addresses: &[UnsavedContractAddress],
    ) {
        let contract_addresses_values = contract_addresses
            .iter()
            .map(
                |UnsavedContractAddress {
                     address,
                     chain_id,
                     contract_name,
                     start_block_number,
                     ..
                 }| {
                    format!("('{address}', {chain_id}, '{contract_name}', {start_block_number}, {start_block_number}, {start_block_number})")
                },
            )
            .collect::<Vec<_>>()
            .join(",");

        let query = format!("
            INSERT INTO chaindexing_contract_addresses 
            (address, chain_id, contract_name, next_block_number_to_handle_from, next_block_number_to_ingest_from, start_block_number)
            VALUES {contract_addresses_values}
            ON CONFLICT (chain_id, address)
            DO NOTHING
        ");

        Self::execute(client, &query).await;
    }

    async fn create_contract_address<'a>(
        client: &Self::RawQueryTxnClient<'a>,
        contract_address: &UnsavedContractAddress,
    ) {
        let address = &contract_address.address;
        let contract_name = &contract_address.contract_name;
        let chain_id = contract_address.chain_id;
        let start_block_number = contract_address.start_block_number;

        let query = format!(
            "INSERT INTO chaindexing_contract_addresses 
            (address, chain_id, contract_name, next_block_number_to_handle_from, next_block_number_to_ingest_from, start_block_number)
            VALUES ('{address}', {chain_id}, '{contract_name}', {start_block_number}, {start_block_number}, {start_block_number})
            ON CONFLICT (chain_id, address)
            DO NOTHING"
        );

        Self::execute_in_txn(client, &query).await;
    }

    async fn update_next_block_number_to_handle_from<'a>(
        client: &Self::RawQueryTxnClient<'a>,
        address: &str,
        chain_id: u64,
        block_number: u64,
    ) {
        let query = format!(
            "UPDATE chaindexing_contract_addresses
        SET next_block_number_to_handle_from = {block_number}
        WHERE chain_id = {chain_id} AND address = '{address}'"
        );

        Self::execute_in_txn(client, &query).await;
    }

    async fn update_next_block_numbers_to_handle_from<'a>(
        client: &Self::RawQueryTxnClient<'a>,
        chain_id: u64,
        block_number: u64,
    ) {
        let query = format!(
            "UPDATE chaindexing_contract_addresses
        SET next_block_number_to_handle_from = {block_number}
        WHERE chain_id = {chain_id}"
        );

        Self::execute_in_txn(client, &query).await;
    }

    async fn update_next_block_number_for_side_effects<'a>(
        client: &Self::RawQueryTxnClient<'a>,
        address: &str,
        chain_id: u64,
        block_number: u64,
    ) {
        let query = format!(
            "UPDATE chaindexing_contract_addresses
        SET next_block_number_for_side_effects = {block_number}
        WHERE chain_id = {chain_id} AND address = '{address}'"
        );

        Self::execute_in_txn(client, &query).await;
    }

    async fn update_reorged_blocks_as_handled<'a>(
        client: &Self::RawQueryTxnClient<'a>,
        reorged_block_ids: &[i32],
    ) {
        let query = format!(
            "UPDATE chaindexing_reorged_blocks
            SET handled_at = '{handled_at}'
            WHERE id IN ({reorged_block_ids})",
            reorged_block_ids = join_numbers_with_comma(reorged_block_ids),
            handled_at = chrono::Utc::now().timestamp(),
        );

        Self::execute_in_txn(client, &query).await;
    }

    async fn append_root_state(client: &Self::RawQueryClient, new_root_state: &root::State) {
        let reset_count = new_root_state.reset_count;
        let reset_including_side_effects_count = new_root_state.reset_including_side_effects_count;

        let query = format!(
            "INSERT INTO chaindexing_root_states
            (reset_count, reset_including_side_effects_count)
            VALUES ('{reset_count}', '{reset_including_side_effects_count}')"
        );

        Self::execute(client, &query).await;
    }

    async fn prune_events(client: &Self::RawQueryClient, min_block_number: u64, chain_id: u64) {
        let query = format!(
            "DELETE FROM chaindexing_events
            WHERE block_number < {min_block_number}
            AND chain_id = {chain_id}
            "
        );

        Self::execute(client, &query).await;
    }

    async fn prune_nodes(client: &Self::RawQueryClient, retain_size: u16) {
        let query = format!(
            "DELETE FROM chaindexing_nodes
            WHERE id NOT IN (
                SELECT id
                FROM chaindexing_nodes
                ORDER BY id DESC
                LIMIT {retain_size}
            )
            "
        );

        Self::execute(client, &query).await;
    }

    async fn prune_root_states(client: &Self::RawQueryClient, retain_size: u64) {
        let query = format!(
            "DELETE FROM chaindexing_root_states
            WHERE id NOT IN (
                SELECT id
                FROM chaindexing_root_states
                ORDER BY id DESC
                LIMIT {retain_size}
            )
            "
        );

        Self::execute(client, &query).await;
    }
}

#[crate::augmenting_std::async_trait]
impl LoadsDataWithRawQuery for PostgresRepo {
    async fn create_and_load_new_node(client: &Self::RawQueryClient) -> Node {
        Self::load_data(
            client,
            "INSERT INTO chaindexing_nodes DEFAULT VALUES RETURNING *",
        )
        .await
        .unwrap()
    }

    async fn load_last_root_state(client: &Self::RawQueryClient) -> Option<root::State> {
        Self::load_data(
            client,
            "SELECT * FROM chaindexing_root_states
            ORDER BY id DESC
            LIMIT 1",
        )
        .await
    }

    async fn load_events(
        client: &Self::RawQueryClient,
        chain_id: u64,
        contract_address: &str,
        from_block_number: u64,
        limit: u64,
    ) -> Vec<Event> {
        let query = format!(
            "SELECT * from chaindexing_events
            WHERE chain_id = {chain_id} AND contract_address= '{contract_address}'
            AND block_number >= {from_block_number} 
            ORDER BY block_number ASC, log_index ASC
            LIMIT {limit}",
        );

        Self::load_data_list(client, &query).await
    }

    async fn load_latest_events(
        client: &Self::RawQueryClient,
        addresses: &[String],
    ) -> Vec<PartialEvent> {
        let query = format!(
            "WITH EventsWithRowNumbers AS (
                SELECT
                    *,
                    ROW_NUMBER() OVER (PARTITION BY contract_address ORDER BY block_number DESC, log_index DESC) AS row_no
                FROM
                    chaindexing_events
                WHERE
                contract_address IN ({addresses})
            )
            SELECT
                *
            FROM
                EventsWithRowNumbers
            WHERE
                row_no = 1",
                addresses = join_strings_with_comma(addresses),
        );

        Self::load_data_list(client, &query).await
    }
    async fn load_unhandled_reorged_blocks(client: &Self::RawQueryClient) -> Vec<ReorgedBlock> {
        Self::load_data_list(
            client,
            "SELECT * FROM chaindexing_reorged_blocks WHERE handled_at is NULL",
        )
        .await
    }

    async fn load_data<Data: Send + DeserializeOwned>(
        client: &Self::RawQueryClient,
        query: &str,
    ) -> Option<Data> {
        let mut data_list: Vec<Data> = Self::load_data_list(client, query).await;

        assert!(data_list.len() <= 1);

        data_list.pop()
    }
    async fn load_data_in_txn<'a, Data: Send + DeserializeOwned>(
        client: &Self::RawQueryTxnClient<'a>,
        query: &str,
    ) -> Option<Data> {
        let mut data_list: Vec<Data> = Self::load_data_list_in_txn(client, query).await;

        assert!(data_list.len() <= 1);

        data_list.pop()
    }

    async fn load_data_list<Data: Send + DeserializeOwned>(
        client: &Self::RawQueryClient,
        query: &str,
    ) -> Vec<Data> {
        let json_aggregate = get_json_aggregate(client, query).await;

        if json_aggregate.is_object() || json_aggregate.is_array() {
            serde_json::from_value(json_aggregate).unwrap()
        } else {
            vec![]
        }
    }

    async fn load_data_list_in_txn<'a, Data: Send + DeserializeOwned>(
        txn_client: &Self::RawQueryTxnClient<'a>,
        query: &str,
    ) -> Vec<Data> {
        let json_aggregate = get_json_aggregate_in_txn(txn_client, query).await;

        if json_aggregate.is_object() || json_aggregate.is_array() {
            serde_json::from_value(json_aggregate).unwrap()
        } else {
            vec![]
        }
    }
}

async fn get_json_aggregate(client: &PostgresRepoClient, query: &str) -> serde_json::Value {
    let rows = client.query(json_aggregate_query(query).as_str(), &[]).await.unwrap();
    rows.first().unwrap().get(0)
}

async fn get_json_aggregate_in_txn<'a>(
    txn_client: &PostgresRepoTxnClient<'a>,
    query: &str,
) -> serde_json::Value {
    let rows = txn_client.query(json_aggregate_query(query).as_str(), &[]).await.unwrap();
    rows.first().unwrap().get(0)
}

fn json_aggregate_query(query: &str) -> String {
    format!("WITH result AS ({query}) SELECT COALESCE(json_agg(result), '[]'::json) FROM result",)
}

fn join_numbers_with_comma(numbers: &[impl ToString]) -> String {
    numbers.iter().map(|n| n.to_string()).collect::<Vec<String>>().join(",")
}

fn join_strings_with_comma(strings: &[String]) -> String {
    strings.iter().map(|string| format!("'{string}'")).collect::<Vec<_>>().join(",")
}