chainindex_core/
handler.rs1use async_trait::async_trait;
4use std::collections::HashMap;
5use std::sync::Arc;
6
7use crate::error::IndexerError;
8use crate::types::{BlockSummary, IndexContext};
9
10#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
15pub struct DecodedEvent {
16 pub chain: String,
18 pub schema: String,
20 pub address: String,
22 pub tx_hash: String,
24 pub block_number: u64,
26 pub log_index: u32,
28 pub fields_json: serde_json::Value,
30}
31
32#[async_trait]
36pub trait EventHandler: Send + Sync {
37 async fn handle(&self, event: &DecodedEvent, ctx: &IndexContext) -> Result<(), IndexerError>;
39
40 fn schema_name(&self) -> &str;
42}
43
44#[async_trait]
48pub trait BlockHandler: Send + Sync {
49 async fn handle_block(
50 &self,
51 block: &BlockSummary,
52 ctx: &IndexContext,
53 ) -> Result<(), IndexerError>;
54}
55
56#[async_trait]
60pub trait ReorgHandler: Send + Sync {
61 async fn on_reorg(
62 &self,
63 dropped: &[BlockSummary],
64 ctx: &IndexContext,
65 ) -> Result<(), IndexerError>;
66}
67
68pub struct HandlerRegistry {
70 event_handlers: HashMap<String, Vec<Arc<dyn EventHandler>>>,
71 block_handlers: Vec<Arc<dyn BlockHandler>>,
72 reorg_handlers: Vec<Arc<dyn ReorgHandler>>,
73}
74
75impl HandlerRegistry {
76 pub fn new() -> Self {
77 Self {
78 event_handlers: HashMap::new(),
79 block_handlers: vec![],
80 reorg_handlers: vec![],
81 }
82 }
83
84 pub fn on_event(&mut self, handler: Arc<dyn EventHandler>) {
86 self.event_handlers
87 .entry(handler.schema_name().to_string())
88 .or_default()
89 .push(handler);
90 }
91
92 pub fn on_block(&mut self, handler: Arc<dyn BlockHandler>) {
94 self.block_handlers.push(handler);
95 }
96
97 pub fn on_reorg(&mut self, handler: Arc<dyn ReorgHandler>) {
99 self.reorg_handlers.push(handler);
100 }
101
102 pub async fn dispatch_event(
104 &self,
105 event: &DecodedEvent,
106 ctx: &IndexContext,
107 ) -> Result<(), IndexerError> {
108 if let Some(handlers) = self.event_handlers.get(&event.schema) {
109 for handler in handlers {
110 handler.handle(event, ctx).await?;
111 }
112 }
113 Ok(())
114 }
115
116 pub async fn dispatch_block(
118 &self,
119 block: &BlockSummary,
120 ctx: &IndexContext,
121 ) -> Result<(), IndexerError> {
122 for handler in &self.block_handlers {
123 handler.handle_block(block, ctx).await?;
124 }
125 Ok(())
126 }
127
128 pub async fn dispatch_reorg(
130 &self,
131 dropped: &[BlockSummary],
132 ctx: &IndexContext,
133 ) -> Result<(), IndexerError> {
134 for handler in &self.reorg_handlers {
135 handler.on_reorg(dropped, ctx).await?;
136 }
137 Ok(())
138 }
139}
140
141impl Default for HandlerRegistry {
142 fn default() -> Self {
143 Self::new()
144 }
145}
146
147#[cfg(test)]
148mod tests {
149 use super::*;
150 use std::sync::atomic::{AtomicU32, Ordering};
151
152 struct Counter(Arc<AtomicU32>, String);
153
154 #[async_trait]
155 impl EventHandler for Counter {
156 async fn handle(&self, _e: &DecodedEvent, _c: &IndexContext) -> Result<(), IndexerError> {
157 self.0.fetch_add(1, Ordering::Relaxed);
158 Ok(())
159 }
160 fn schema_name(&self) -> &str {
161 &self.1
162 }
163 }
164
165 fn dummy_ctx() -> IndexContext {
166 IndexContext {
167 block: crate::types::BlockSummary {
168 number: 1,
169 hash: "0xa".into(),
170 parent_hash: "0x0".into(),
171 timestamp: 0,
172 tx_count: 0,
173 },
174 phase: crate::types::IndexPhase::Backfill,
175 chain: "ethereum".into(),
176 }
177 }
178
179 fn dummy_event(schema: &str) -> DecodedEvent {
180 DecodedEvent {
181 chain: "ethereum".into(),
182 schema: schema.to_string(),
183 address: "0x0".into(),
184 tx_hash: "0x0".into(),
185 block_number: 1,
186 log_index: 0,
187 fields_json: serde_json::Value::Null,
188 }
189 }
190
191 #[tokio::test]
192 async fn event_handler_dispatch() {
193 let count = Arc::new(AtomicU32::new(0));
194 let handler = Arc::new(Counter(count.clone(), "ERC20Transfer".into()));
195
196 let mut registry = HandlerRegistry::new();
197 registry.on_event(handler);
198
199 let ctx = dummy_ctx();
200 registry
201 .dispatch_event(&dummy_event("ERC20Transfer"), &ctx)
202 .await
203 .unwrap();
204 registry
205 .dispatch_event(&dummy_event("UniswapSwap"), &ctx)
206 .await
207 .unwrap(); assert_eq!(count.load(Ordering::Relaxed), 1);
210 }
211}