mod chain_reorg;
mod chains;
mod config;
mod contracts;
mod diesel;
mod handlers;
mod nodes;
mod pruning;
mod repos;
mod root;
pub mod augmenting_std;
#[cfg(feature = "live-states")]
pub mod state_bus;
pub use chains::{Chain, ChainId};
pub use config::{Config, OptimizationConfig};
pub use contracts::{Contract, ContractAddress, EventAbi};
pub use events::{Event, EventParam};
pub use handlers::{
PureHandler as EventHandler, PureHandlerContext as EventContext, SideEffectHandler,
SideEffectHandlerContext as SideEffectContext,
};
pub use nodes::NodeHeartbeat as Heartbeat;
pub use chaindexing_macros::state_migrations;
pub use ethers::types::{I256, U256};
use tokio::sync::Mutex;
pub mod states;
pub type Address = ethers::types::Address;
pub type Bytes = Vec<u8>;
#[cfg(feature = "postgres")]
pub use repos::PostgresRepo;
#[cfg(feature = "live-states")]
pub use state_bus::subscribe as subscribe_state_changes;
use serde::Serialize;
use std::collections::HashMap;
#[derive(Clone, Debug, Serialize)]
pub struct StateChange {
pub table: String,
pub op: String, pub state: HashMap<String, String>,
pub block: i64,
pub chain_id: i64,
}
#[doc(hidden)]
pub mod booting;
#[doc(hidden)]
pub mod deferred_futures;
#[doc(hidden)]
pub mod events;
#[doc(hidden)]
pub mod ingester;
#[doc(hidden)]
pub use contracts::{ContractEvent, UnsavedContractAddress};
#[doc(hidden)]
pub use ingester::Provider as IngesterProvider;
#[doc(hidden)]
pub use repos::*;
#[doc(hidden)]
#[cfg(feature = "postgres")]
pub use repos::{PostgresRepoConn, PostgresRepoPool};
#[cfg(feature = "postgres")]
#[doc(hidden)]
pub type ChaindexingRepo = PostgresRepo;
#[cfg(feature = "postgres")]
#[doc(hidden)]
pub type ChaindexingRepoPool = PostgresRepoPool;
#[cfg(feature = "postgres")]
#[doc(hidden)]
pub type ChaindexingRepoConn<'a> = PostgresRepoConn<'a>;
#[cfg(feature = "postgres")]
#[doc(hidden)]
pub type ChaindexingRepoClient = PostgresRepoClient;
#[cfg(feature = "postgres")]
#[doc(hidden)]
pub type ChaindexingRepoTxnClient<'a> = PostgresRepoTxnClient<'a>;
#[cfg(feature = "postgres")]
#[doc(hidden)]
pub use repos::PostgresRepoAsyncConnection as ChaindexingRepoAsyncConnection;
use std::fmt::Debug;
use std::sync::Arc;
use std::time::Duration;
use tokio::time;
use config::ConfigError;
use nodes::NodeTasks;
use crate::nodes::{NodeTask, NodeTasksRunner};
pub(crate) type ChaindexingRepoClientMutex = Arc<Mutex<PostgresRepoClient>>;
pub enum ChaindexingError {
Config(ConfigError),
}
impl From<ConfigError> for ChaindexingError {
fn from(value: ConfigError) -> Self {
ChaindexingError::Config(value)
}
}
impl Debug for ChaindexingError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
ChaindexingError::Config(config_error) => {
write!(f, "Config Error: {config_error:?}")
}
}
}
}
pub async fn index_states<S: Send + Sync + Clone + Debug + 'static>(
config: &Config<S>,
) -> Result<(), ChaindexingError> {
config.validate()?;
let client = config.repo.get_client().await;
booting::setup_nodes(config, &client).await;
let current_node = ChaindexingRepo::create_and_load_new_node(&client).await;
wait_for_non_leader_nodes_to_abort(config.get_node_election_rate_ms()).await;
booting::setup(config, &client).await?;
let config = config.clone();
tokio::spawn(async move {
let mut interval =
time::interval(Duration::from_millis(config.get_node_election_rate_ms()));
let pool = config.repo.get_pool(1).await;
let mut conn = ChaindexingRepo::get_conn(&pool).await;
let conn = &mut conn;
let mut node_tasks = NodeTasks::new(¤t_node);
loop {
ChaindexingRepo::keep_node_active(conn, ¤t_node).await;
let active_nodes =
ChaindexingRepo::get_active_nodes(conn, config.get_node_election_rate_ms()).await;
node_tasks
.orchestrate(
&config.optimization_config,
&active_nodes,
&get_tasks_runner(&config),
)
.await;
interval.tick().await;
}
});
Ok(())
}
pub async fn include_contract<'a, C: handlers::HandlerContext<'a>>(
event_context: &C,
contract_name: &str,
address: &str,
) {
let event = event_context.get_event();
let chain_id = event.get_chain_id();
let start_block_number = event.get_block_number();
let contract_address =
UnsavedContractAddress::new(contract_name, address, &chain_id, start_block_number);
ChaindexingRepo::create_contract_address(event_context.get_client(), &contract_address).await;
}
async fn wait_for_non_leader_nodes_to_abort(node_election_rate_ms: u64) {
time::sleep(Duration::from_millis(node_election_rate_ms)).await;
}
fn get_tasks_runner<S: Sync + Send + Debug + Clone + 'static>(
config: &Config<S>,
) -> impl NodeTasksRunner + '_ {
struct ChaindexingNodeTasksRunner<'a, S: Send + Sync + Clone + Debug + 'static> {
config: &'a Config<S>,
}
#[crate::augmenting_std::async_trait]
impl<'a, S: Send + Sync + Clone + Debug + 'static> NodeTasksRunner
for ChaindexingNodeTasksRunner<'a, S>
{
async fn run(&self) -> Vec<NodeTask> {
let ingester = ingester::start(self.config).await;
let handlers = handlers::start(self.config).await;
vec![ingester, handlers]
}
}
ChaindexingNodeTasksRunner { config }
}
pub mod prelude {
pub use crate::augmenting_std::{async_trait, serde};
pub use crate::chains::{Chain, ChainId};
pub use crate::config::{Config, OptimizationConfig};
pub use crate::contracts::{Contract, ContractAddress, EventAbi};
pub use crate::events::{Event, EventParam};
pub use crate::handlers::{
PureHandler as EventHandler, PureHandlerContext as EventContext, SideEffectHandler,
SideEffectHandlerContext as SideEffectContext,
};
pub use crate::nodes::NodeHeartbeat as Heartbeat;
pub use crate::states::{
ChainState, ContractState, Filters, MultiChainState, StateMigrations, Updates,
};
pub use crate::Address;
pub use chaindexing_macros::state_migrations;
pub use ethers::types::{I256, U256};
}