use async_trait::async_trait;
use std::collections::HashMap;
use std::sync::Arc;
use crate::error::IndexerError;
use crate::types::{BlockSummary, IndexContext};
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct DecodedEvent {
pub chain: String,
pub schema: String,
pub address: String,
pub tx_hash: String,
pub block_number: u64,
pub log_index: u32,
pub fields_json: serde_json::Value,
}
#[async_trait]
pub trait EventHandler: Send + Sync {
async fn handle(&self, event: &DecodedEvent, ctx: &IndexContext) -> Result<(), IndexerError>;
fn schema_name(&self) -> &str;
}
#[async_trait]
pub trait BlockHandler: Send + Sync {
async fn handle_block(
&self,
block: &BlockSummary,
ctx: &IndexContext,
) -> Result<(), IndexerError>;
}
#[async_trait]
pub trait ReorgHandler: Send + Sync {
async fn on_reorg(
&self,
dropped: &[BlockSummary],
ctx: &IndexContext,
) -> Result<(), IndexerError>;
}
pub struct HandlerRegistry {
event_handlers: HashMap<String, Vec<Arc<dyn EventHandler>>>,
block_handlers: Vec<Arc<dyn BlockHandler>>,
reorg_handlers: Vec<Arc<dyn ReorgHandler>>,
}
impl HandlerRegistry {
pub fn new() -> Self {
Self {
event_handlers: HashMap::new(),
block_handlers: vec![],
reorg_handlers: vec![],
}
}
pub fn on_event(&mut self, handler: Arc<dyn EventHandler>) {
self.event_handlers
.entry(handler.schema_name().to_string())
.or_default()
.push(handler);
}
pub fn on_block(&mut self, handler: Arc<dyn BlockHandler>) {
self.block_handlers.push(handler);
}
pub fn on_reorg(&mut self, handler: Arc<dyn ReorgHandler>) {
self.reorg_handlers.push(handler);
}
pub async fn dispatch_event(
&self,
event: &DecodedEvent,
ctx: &IndexContext,
) -> Result<(), IndexerError> {
if let Some(handlers) = self.event_handlers.get(&event.schema) {
for handler in handlers {
handler.handle(event, ctx).await?;
}
}
Ok(())
}
pub async fn dispatch_block(
&self,
block: &BlockSummary,
ctx: &IndexContext,
) -> Result<(), IndexerError> {
for handler in &self.block_handlers {
handler.handle_block(block, ctx).await?;
}
Ok(())
}
pub async fn dispatch_reorg(
&self,
dropped: &[BlockSummary],
ctx: &IndexContext,
) -> Result<(), IndexerError> {
for handler in &self.reorg_handlers {
handler.on_reorg(dropped, ctx).await?;
}
Ok(())
}
}
impl Default for HandlerRegistry {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::atomic::{AtomicU32, Ordering};
struct Counter(Arc<AtomicU32>, String);
#[async_trait]
impl EventHandler for Counter {
async fn handle(&self, _e: &DecodedEvent, _c: &IndexContext) -> Result<(), IndexerError> {
self.0.fetch_add(1, Ordering::Relaxed);
Ok(())
}
fn schema_name(&self) -> &str {
&self.1
}
}
fn dummy_ctx() -> IndexContext {
IndexContext {
block: crate::types::BlockSummary {
number: 1,
hash: "0xa".into(),
parent_hash: "0x0".into(),
timestamp: 0,
tx_count: 0,
},
phase: crate::types::IndexPhase::Backfill,
chain: "ethereum".into(),
}
}
fn dummy_event(schema: &str) -> DecodedEvent {
DecodedEvent {
chain: "ethereum".into(),
schema: schema.to_string(),
address: "0x0".into(),
tx_hash: "0x0".into(),
block_number: 1,
log_index: 0,
fields_json: serde_json::Value::Null,
}
}
#[tokio::test]
async fn event_handler_dispatch() {
let count = Arc::new(AtomicU32::new(0));
let handler = Arc::new(Counter(count.clone(), "ERC20Transfer".into()));
let mut registry = HandlerRegistry::new();
registry.on_event(handler);
let ctx = dummy_ctx();
registry
.dispatch_event(&dummy_event("ERC20Transfer"), &ctx)
.await
.unwrap();
registry
.dispatch_event(&dummy_event("UniswapSwap"), &ctx)
.await
.unwrap();
assert_eq!(count.load(Ordering::Relaxed), 1);
}
}