ghost_crab/
event_handler.rs1use crate::indexer::rpc_manager::Provider;
2use crate::indexer::templates::TemplateManager;
3use crate::latest_block_manager::LatestBlockManager;
4use alloy::eips::BlockNumberOrTag;
5use alloy::primitives::Address;
6use alloy::providers::Provider as AlloyProvider;
7use alloy::rpc::types::eth::Filter;
8use alloy::rpc::types::eth::Log;
9use alloy::rpc::types::Block;
10use alloy::transports::TransportError;
11use async_trait::async_trait;
12use ghost_crab_common::config::ExecutionMode;
13use std::sync::Arc;
14use std::time::Duration;
15
16pub struct EventContext {
17 pub log: Log,
18 pub provider: Provider,
19 pub templates: TemplateManager,
20 pub contract_address: Address,
21}
22
23impl EventContext {
24 pub async fn block(&self, hydrate: bool) -> Result<Option<Block>, TransportError> {
25 match self.log.block_number {
26 Some(block_number) => {
27 self.provider
28 .get_block_by_number(BlockNumberOrTag::Number(block_number), hydrate)
29 .await
30 }
31 None => Err(TransportError::local_usage_str("Error occurred while fetching the current block number within an EventHandler. The log.block_number value is None.")),
32 }
33 }
34}
35
36pub type EventHandlerInstance = Arc<Box<(dyn EventHandler + Send + Sync)>>;
37
38#[async_trait]
39pub trait EventHandler {
40 async fn handle(&self, params: EventContext);
41 fn name(&self) -> String;
42 fn event_signature(&self) -> String;
43}
44
45#[derive(Clone)]
46pub struct ProcessEventsInput {
47 pub start_block: u64,
48 pub address: Address,
49 pub step: u64,
50 pub handler: EventHandlerInstance,
51 pub templates: TemplateManager,
52 pub provider: Provider,
53 pub execution_mode: ExecutionMode,
54}
55
56pub async fn process_events(
57 ProcessEventsInput { start_block, execution_mode, step, address, handler, templates, provider }: ProcessEventsInput,
58) -> Result<(), TransportError> {
59 let event_signature = handler.event_signature();
60
61 let mut current_block = start_block;
62 let mut latest_block_manager =
63 LatestBlockManager::new(provider.clone(), Duration::from_secs(10));
64
65 loop {
66 let mut end_block = current_block + step;
67 let latest_block = latest_block_manager.get().await?;
68
69 if end_block > latest_block {
70 end_block = latest_block;
71 }
72
73 if current_block >= end_block {
74 tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
75 continue;
76 }
77
78 let source = handler.name();
79
80 println!("[{}] Processing logs from {} to {}", source, current_block, end_block);
81
82 let filter = Filter::new()
83 .address(address)
84 .event(&event_signature)
85 .from_block(current_block)
86 .to_block(end_block);
87
88 let logs = provider.get_logs(&filter).await?;
89
90 match execution_mode {
91 ExecutionMode::Parallel => {
92 for log in logs {
93 let handler = handler.clone();
94 let provider = provider.clone();
95 let templates = templates.clone();
96
97 tokio::spawn(async move {
98 handler
99 .handle(EventContext {
100 log,
101 provider,
102 templates,
103 contract_address: address,
104 })
105 .await;
106 });
107 }
108 }
109 ExecutionMode::Serial => {
110 for log in logs {
111 let templates = templates.clone();
112 let provider = provider.clone();
113
114 handler
115 .handle(EventContext {
116 log,
117 provider,
118 templates,
119 contract_address: address,
120 })
121 .await;
122 }
123 }
124 }
125
126 current_block = end_block;
127 }
128}