Skip to main content

chainindex_core/
handler.rs

1//! Event and block handler traits + registry.
2
3use async_trait::async_trait;
4use std::collections::HashMap;
5use std::sync::Arc;
6
7use crate::error::IndexerError;
8use crate::types::{BlockSummary, IndexContext};
9
10/// A decoded blockchain event (chain-agnostic representation).
11///
12/// In practice this will be a re-export of chaincodec's `DecodedEvent`, but
13/// chainindex-core avoids a direct chaincodec dependency to stay modular.
14#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
15pub struct DecodedEvent {
16    /// Chain identifier (e.g. `"ethereum"`, `"arbitrum"`).
17    pub chain: String,
18    /// The schema/event name (e.g. `"ERC20Transfer"`).
19    pub schema: String,
20    /// Contract address that emitted the event.
21    pub address: String,
22    /// Transaction hash.
23    pub tx_hash: String,
24    /// Block number.
25    pub block_number: u64,
26    /// Log index within the block.
27    pub log_index: u32,
28    /// Raw decoded fields as JSON for flexibility.
29    pub fields_json: serde_json::Value,
30}
31
32/// Trait for user-provided event handlers.
33///
34/// Implement this to process specific event types during indexing.
35#[async_trait]
36pub trait EventHandler: Send + Sync {
37    /// Called for each decoded event that matches the handler's schema.
38    async fn handle(&self, event: &DecodedEvent, ctx: &IndexContext) -> Result<(), IndexerError>;
39
40    /// The event schema name this handler processes (e.g. `"ERC20Transfer"`).
41    fn schema_name(&self) -> &str;
42}
43
44/// Trait for user-provided block handlers.
45///
46/// Called once per block regardless of events.
47#[async_trait]
48pub trait BlockHandler: Send + Sync {
49    async fn handle_block(
50        &self,
51        block: &BlockSummary,
52        ctx: &IndexContext,
53    ) -> Result<(), IndexerError>;
54}
55
56/// Trait for reorg handlers.
57///
58/// Called when a chain reorganization is detected.
59#[async_trait]
60pub trait ReorgHandler: Send + Sync {
61    async fn on_reorg(
62        &self,
63        dropped: &[BlockSummary],
64        ctx: &IndexContext,
65    ) -> Result<(), IndexerError>;
66}
67
68/// Registry of event + block + reorg handlers.
69pub struct HandlerRegistry {
70    event_handlers: HashMap<String, Vec<Arc<dyn EventHandler>>>,
71    block_handlers: Vec<Arc<dyn BlockHandler>>,
72    reorg_handlers: Vec<Arc<dyn ReorgHandler>>,
73}
74
75impl HandlerRegistry {
76    pub fn new() -> Self {
77        Self {
78            event_handlers: HashMap::new(),
79            block_handlers: vec![],
80            reorg_handlers: vec![],
81        }
82    }
83
84    /// Register an event handler for a specific schema name.
85    pub fn on_event(&mut self, handler: Arc<dyn EventHandler>) {
86        self.event_handlers
87            .entry(handler.schema_name().to_string())
88            .or_default()
89            .push(handler);
90    }
91
92    /// Register a block handler (called for every block).
93    pub fn on_block(&mut self, handler: Arc<dyn BlockHandler>) {
94        self.block_handlers.push(handler);
95    }
96
97    /// Register a reorg handler.
98    pub fn on_reorg(&mut self, handler: Arc<dyn ReorgHandler>) {
99        self.reorg_handlers.push(handler);
100    }
101
102    /// Dispatch an event to all matching handlers.
103    pub async fn dispatch_event(
104        &self,
105        event: &DecodedEvent,
106        ctx: &IndexContext,
107    ) -> Result<(), IndexerError> {
108        if let Some(handlers) = self.event_handlers.get(&event.schema) {
109            for handler in handlers {
110                handler.handle(event, ctx).await?;
111            }
112        }
113        Ok(())
114    }
115
116    /// Dispatch a block to all block handlers.
117    pub async fn dispatch_block(
118        &self,
119        block: &BlockSummary,
120        ctx: &IndexContext,
121    ) -> Result<(), IndexerError> {
122        for handler in &self.block_handlers {
123            handler.handle_block(block, ctx).await?;
124        }
125        Ok(())
126    }
127
128    /// Dispatch a reorg event to all reorg handlers.
129    pub async fn dispatch_reorg(
130        &self,
131        dropped: &[BlockSummary],
132        ctx: &IndexContext,
133    ) -> Result<(), IndexerError> {
134        for handler in &self.reorg_handlers {
135            handler.on_reorg(dropped, ctx).await?;
136        }
137        Ok(())
138    }
139}
140
141impl Default for HandlerRegistry {
142    fn default() -> Self {
143        Self::new()
144    }
145}
146
147#[cfg(test)]
148mod tests {
149    use super::*;
150    use std::sync::atomic::{AtomicU32, Ordering};
151
152    struct Counter(Arc<AtomicU32>, String);
153
154    #[async_trait]
155    impl EventHandler for Counter {
156        async fn handle(&self, _e: &DecodedEvent, _c: &IndexContext) -> Result<(), IndexerError> {
157            self.0.fetch_add(1, Ordering::Relaxed);
158            Ok(())
159        }
160        fn schema_name(&self) -> &str {
161            &self.1
162        }
163    }
164
165    fn dummy_ctx() -> IndexContext {
166        IndexContext {
167            block: crate::types::BlockSummary {
168                number: 1,
169                hash: "0xa".into(),
170                parent_hash: "0x0".into(),
171                timestamp: 0,
172                tx_count: 0,
173            },
174            phase: crate::types::IndexPhase::Backfill,
175            chain: "ethereum".into(),
176        }
177    }
178
179    fn dummy_event(schema: &str) -> DecodedEvent {
180        DecodedEvent {
181            chain: "ethereum".into(),
182            schema: schema.to_string(),
183            address: "0x0".into(),
184            tx_hash: "0x0".into(),
185            block_number: 1,
186            log_index: 0,
187            fields_json: serde_json::Value::Null,
188        }
189    }
190
191    #[tokio::test]
192    async fn event_handler_dispatch() {
193        let count = Arc::new(AtomicU32::new(0));
194        let handler = Arc::new(Counter(count.clone(), "ERC20Transfer".into()));
195
196        let mut registry = HandlerRegistry::new();
197        registry.on_event(handler);
198
199        let ctx = dummy_ctx();
200        registry
201            .dispatch_event(&dummy_event("ERC20Transfer"), &ctx)
202            .await
203            .unwrap();
204        registry
205            .dispatch_event(&dummy_event("UniswapSwap"), &ctx)
206            .await
207            .unwrap(); // no handler
208
209        assert_eq!(count.load(Ordering::Relaxed), 1);
210    }
211}