chaindexing 0.1.80

Index any EVM chain and query in SQL
Documentation
use std::collections::HashMap;

use crate::ChaindexingRepoClient;
use crate::{ChaindexingRepo, ChaindexingRepoTxnClient};
use crate::{ExecutesWithRawQuery, LoadsDataWithRawQuery};

use super::state_versions::{StateVersion, StateVersions, STATE_VERSIONS_UNIQUE_FIELDS};
use super::{serde_map_to_string_map, to_and_filters, to_columns_and_values};

pub struct StateViews;

impl StateViews {
    pub async fn refresh<'a>(
        state_version_group_ids: &[String],
        table_name: &str,
        client: &ChaindexingRepoTxnClient<'a>,
    ) {
        let latest_state_versions =
            StateVersions::get_latest(state_version_group_ids, table_name, client).await;

        for latest_state_version in latest_state_versions {
            StateView::refresh(&latest_state_version, table_name, client).await
        }
    }
}

pub struct StateView;

impl StateView {
    pub async fn get_complete<'a>(
        state_view: &HashMap<String, String>,
        table_name: &str,
        client: &ChaindexingRepoTxnClient<'a>,
    ) -> HashMap<String, String> {
        let query = format!(
            "SELECT * FROM {table_name} WHERE {filters}",
            filters = to_and_filters(state_view),
        );

        serde_map_to_string_map(
            &ChaindexingRepo::load_data_in_txn::<HashMap<String, serde_json::Value>>(
                client, &query,
            )
            .await
            .unwrap(),
        )
    }

    pub async fn refresh<'a>(
        latest_state_version: &HashMap<String, String>,
        table_name: &str,
        client: &ChaindexingRepoTxnClient<'a>,
    ) {
        let state_version_group_id = StateVersion::get_group_id(latest_state_version);

        if StateVersion::was_deleted(latest_state_version) {
            Self::delete(&state_version_group_id, table_name, client).await;

            // Publish delete state change
            #[cfg(feature = "live-states")]
            crate::state_bus::publish(crate::StateChange {
                table: table_name.to_string(),
                op: "delete".to_string(),
                state: HashMap::new(), // Empty for delete
                block: latest_state_version
                    .get("block_number")
                    .and_then(|b| b.parse().ok())
                    .unwrap_or(0),
                chain_id: latest_state_version
                    .get("chain_id")
                    .and_then(|c| c.parse().ok())
                    .unwrap_or(0),
            });
        } else {
            let new_state_view = Self::from_latest_state_version(latest_state_version);

            Self::delete(&state_version_group_id, table_name, client).await;
            Self::create(&new_state_view, table_name, client).await;

            // Publish upsert state change
            #[cfg(feature = "live-states")]
            crate::state_bus::publish(crate::StateChange {
                table: table_name.to_string(),
                op: "upsert".to_string(),
                state: new_state_view.clone(),
                block: latest_state_version
                    .get("block_number")
                    .and_then(|b| b.parse().ok())
                    .unwrap_or(0),
                chain_id: latest_state_version
                    .get("chain_id")
                    .and_then(|c| c.parse().ok())
                    .unwrap_or(0),
            });
        }
    }

    pub async fn refresh_without_txn(
        latest_state_version: &HashMap<String, String>,
        table_name: &str,
        client: &ChaindexingRepoClient,
    ) {
        let state_version_group_id = StateVersion::get_group_id(latest_state_version);

        if StateVersion::was_deleted(latest_state_version) {
            Self::delete_without_txn(&state_version_group_id, table_name, client).await;

            // Publish delete state change
            #[cfg(feature = "live-states")]
            crate::state_bus::publish(crate::StateChange {
                table: table_name.to_string(),
                op: "delete".to_string(),
                state: HashMap::new(), // Empty for delete
                block: latest_state_version
                    .get("block_number")
                    .and_then(|b| b.parse().ok())
                    .unwrap_or(0),
                chain_id: latest_state_version
                    .get("chain_id")
                    .and_then(|c| c.parse().ok())
                    .unwrap_or(0),
            });
        } else {
            let new_state_view = Self::from_latest_state_version(latest_state_version);

            Self::delete_without_txn(&state_version_group_id, table_name, client).await;
            Self::create_without_txn(&new_state_view, table_name, client).await;

            // Publish upsert state change
            #[cfg(feature = "live-states")]
            crate::state_bus::publish(crate::StateChange {
                table: table_name.to_string(),
                op: "upsert".to_string(),
                state: new_state_view.clone(),
                block: latest_state_version
                    .get("block_number")
                    .and_then(|b| b.parse().ok())
                    .unwrap_or(0),
                chain_id: latest_state_version
                    .get("chain_id")
                    .and_then(|c| c.parse().ok())
                    .unwrap_or(0),
            });
        }
    }

    fn from_latest_state_version(
        latest_state_version: &HashMap<String, String>,
    ) -> HashMap<String, String> {
        latest_state_version
            .clone()
            .into_iter()
            .filter(|(field, _value)| !STATE_VERSIONS_UNIQUE_FIELDS.contains(&field.as_str()))
            .collect()
    }

    async fn delete<'a>(
        state_version_group_id: &str,
        table_name: &str,
        client: &ChaindexingRepoTxnClient<'a>,
    ) {
        ChaindexingRepo::execute_in_txn(
            client,
            &Self::delete_query(state_version_group_id, table_name),
        )
        .await;
    }
    async fn delete_without_txn(
        state_version_group_id: &str,
        table_name: &str,
        client: &ChaindexingRepoClient,
    ) {
        ChaindexingRepo::execute(
            client,
            &Self::delete_query(state_version_group_id, table_name),
        )
        .await;
    }
    fn delete_query(state_version_group_id: &str, table_name: &str) -> String {
        format!(
            "DELETE FROM {table_name} WHERE state_version_group_id = '{state_version_group_id}'",
        )
    }

    async fn create<'a>(
        new_state_view: &HashMap<String, String>,
        table_name: &str,
        client: &ChaindexingRepoTxnClient<'a>,
    ) {
        ChaindexingRepo::execute_in_txn(client, &Self::create_query(new_state_view, table_name))
            .await;
    }
    async fn create_without_txn(
        new_state_view: &HashMap<String, String>,
        table_name: &str,
        client: &ChaindexingRepoClient,
    ) {
        ChaindexingRepo::execute(client, &Self::create_query(new_state_view, table_name)).await;
    }
    fn create_query(new_state_view: &HashMap<String, String>, table_name: &str) -> String {
        let (columns, values) = to_columns_and_values(new_state_view);
        format!(
            "INSERT INTO {table_name} ({columns}) VALUES ({values})",
            columns = columns.join(","),
            values = values.join(",")
        )
    }
}