chaindexing 0.1.80

Index any EVM chain and query in SQL
Documentation
use std::collections::HashMap;

use crate::{
    ChaindexingRepo, ChaindexingRepoTxnClient, ExecutesWithRawQuery, LoadsDataWithRawQuery,
};
use crate::{ChaindexingRepoClient, Event};

use super::{serde_map_to_string_map, to_columns_and_values};

pub const STATE_VERSIONS_TABLE_PREFIX: &str = "chaindexing_state_versions_for_";
pub const STATE_VERSIONS_UNIQUE_FIELDS: [&str; 2] =
    ["state_version_id", "state_version_is_deleted"];

pub struct StateVersions;

impl StateVersions {
    pub async fn get<'a>(
        from_block_number: i64,
        chain_id: i64,
        state_table_name: &str,
        client: &ChaindexingRepoTxnClient<'a>,
    ) -> Vec<HashMap<String, String>> {
        let query = format!(
            "SELECT * FROM {table_name} 
            WHERE chain_id = {chain_id}
            AND block_number >= {from_block_number}",
            table_name = StateVersion::table_name(state_table_name),
        );

        ChaindexingRepo::load_data_list_in_txn::<HashMap<String, serde_json::Value>>(client, &query)
            .await
            .iter()
            .map(serde_map_to_string_map)
            .collect()
    }

    pub fn get_ids(state_versions: &[HashMap<String, String>]) -> Vec<String> {
        state_versions
            .iter()
            .map(|state_version| state_version.get("state_version_id").unwrap())
            .cloned()
            .collect()
    }

    pub fn get_group_ids(state_versions: &[HashMap<String, String>]) -> Vec<String> {
        state_versions
            .iter()
            .map(|state_version| state_version.get("state_version_group_id").unwrap())
            .cloned()
            .collect()
    }

    pub async fn delete_by_ids<'a>(
        ids: &[String],
        state_table_name: &str,
        client: &ChaindexingRepoTxnClient<'a>,
    ) {
        let query = format!(
            "DELETE FROM {table_name}
            WHERE state_version_id IN ({ids})",
            table_name = StateVersion::table_name(state_table_name),
            ids = ids.join(",")
        );

        ChaindexingRepo::execute_in_txn(client, &query).await;
    }

    pub async fn get_latest<'a>(
        group_ids: &[String],
        state_table_name: &str,
        client: &ChaindexingRepoTxnClient<'a>,
    ) -> Vec<HashMap<String, String>> {
        let query = format!(
            "SELECT DISTINCT ON (state_version_group_id) * FROM {table_name} 
            WHERE state_version_group_id IN ({group_ids}) 
            ORDER BY state_version_group_id, block_number, log_index DESC",
            table_name = StateVersion::table_name(state_table_name),
            group_ids = group_ids.iter().map(|id| format!("'{id}'")).collect::<Vec<_>>().join(",")
        );

        ChaindexingRepo::load_data_list_in_txn::<HashMap<String, serde_json::Value>>(client, &query)
            .await
            .iter()
            .map(serde_map_to_string_map)
            .collect()
    }
}

pub struct StateVersion;

impl StateVersion {
    pub fn table_name(state_table_name: &str) -> String {
        format!("{STATE_VERSIONS_TABLE_PREFIX}{state_table_name}")
    }

    pub fn was_deleted(state_version: &HashMap<String, String>) -> bool {
        state_version.get("state_version_is_deleted").unwrap() == "true"
    }

    pub fn get_group_id(state_version: &HashMap<String, String>) -> String {
        state_version.get("state_version_group_id").unwrap().to_owned()
    }

    pub async fn create<'a>(
        state: &HashMap<String, String>,
        state_table_name: &str,
        event: &Event,
        client: &ChaindexingRepoTxnClient<'a>,
    ) -> HashMap<String, String> {
        let mut state_version = state.clone();
        state_version.insert(
            "state_version_group_id".to_owned(),
            uuid::Uuid::new_v4().to_string(),
        );

        Self::append(&state_version, state_table_name, event, client).await
    }

    pub async fn update<'a>(
        state: &HashMap<String, String>,
        updates: &HashMap<String, String>,
        state_table_name: &str,
        event: &Event,
        client: &ChaindexingRepoTxnClient<'a>,
    ) -> HashMap<String, String> {
        let mut state_version = state.clone();
        state_version.extend(updates.clone());
        Self::append(&state_version, state_table_name, event, client).await
    }
    pub async fn update_without_txn(
        state: &HashMap<String, String>,
        updates: &HashMap<String, String>,
        state_table_name: &str,
        event: &Event,
        client: &mut ChaindexingRepoClient,
    ) -> HashMap<String, String> {
        let mut state_version = state.clone();
        state_version.extend(updates.clone());
        Self::append_without_txn(&state_version, state_table_name, event, client).await
    }

    pub async fn delete<'a>(
        state: &HashMap<String, String>,
        state_table_name: &str,
        event: &Event,
        client: &ChaindexingRepoTxnClient<'a>,
    ) -> HashMap<String, String> {
        let mut state_version = state.clone();
        state_version.insert("state_version_is_deleted".to_owned(), "true".to_owned());
        Self::append(&state_version, state_table_name, event, client).await
    }
    pub async fn delete_without_txn(
        state: &HashMap<String, String>,
        state_table_name: &str,
        event: &Event,
        client: &ChaindexingRepoClient,
    ) -> HashMap<String, String> {
        let mut state_version = state.clone();
        state_version.insert("state_version_is_deleted".to_owned(), "true".to_owned());
        Self::append_without_txn(&state_version, state_table_name, event, client).await
    }

    async fn append<'a>(
        partial_state_version: &HashMap<String, String>,
        state_table_name: &str,
        event: &Event,
        client: &ChaindexingRepoTxnClient<'a>,
    ) -> HashMap<String, String> {
        let query = Self::append_query(partial_state_version, state_table_name, event);

        serde_map_to_string_map(
            &ChaindexingRepo::load_data_in_txn::<HashMap<String, serde_json::Value>>(
                client, &query,
            )
            .await
            .unwrap(),
        )
    }

    async fn append_without_txn(
        partial_state_version: &HashMap<String, String>,
        state_table_name: &str,
        event: &Event,
        client: &ChaindexingRepoClient,
    ) -> HashMap<String, String> {
        let query = Self::append_query(partial_state_version, state_table_name, event);

        serde_map_to_string_map(
            &ChaindexingRepo::load_data::<HashMap<String, serde_json::Value>>(client, &query)
                .await
                .unwrap(),
        )
    }

    fn append_query(
        partial_state_version: &HashMap<String, String>,
        state_table_name: &str,
        event: &Event,
    ) -> String {
        let mut state_version = partial_state_version.clone();
        state_version.extend(Self::extract_part_from_event(event));

        let (columns, values) = to_columns_and_values(&state_version);

        format!(
            "INSERT INTO {table_name} ({columns}) VALUES ({values})
            RETURNING *",
            table_name = Self::table_name(state_table_name),
            columns = columns.join(","),
            values = values.join(",")
        )
    }

    fn extract_part_from_event(event: &Event) -> HashMap<String, String> {
        HashMap::from([
            (
                "contract_address".to_string(),
                event.contract_address.to_owned(),
            ),
            ("chain_id".to_string(), event.chain_id.to_string()),
            (
                "transaction_hash".to_string(),
                event.transaction_hash.to_owned(),
            ),
            (
                "transaction_index".to_string(),
                event.transaction_index.to_string(),
            ),
            ("log_index".to_string(), event.log_index.to_string()),
            ("block_number".to_string(), event.block_number.to_string()),
            ("block_hash".to_string(), event.block_hash.to_owned()),
        ])
    }
}