ghost_crab/
event_handler.rs

1use crate::indexer::rpc_manager::Provider;
2use crate::indexer::templates::TemplateManager;
3use crate::latest_block_manager::LatestBlockManager;
4use alloy::eips::BlockNumberOrTag;
5use alloy::primitives::Address;
6use alloy::providers::Provider as AlloyProvider;
7use alloy::rpc::types::eth::Filter;
8use alloy::rpc::types::eth::Log;
9use alloy::rpc::types::Block;
10use alloy::transports::TransportError;
11use async_trait::async_trait;
12use ghost_crab_common::config::ExecutionMode;
13use std::sync::Arc;
14use std::time::Duration;
15
16pub struct EventContext {
17    pub log: Log,
18    pub provider: Provider,
19    pub templates: TemplateManager,
20    pub contract_address: Address,
21}
22
23impl EventContext {
24    pub async fn block(&self, hydrate: bool) -> Result<Option<Block>, TransportError> {
25        match self.log.block_number {
26            Some(block_number) => {
27                self.provider
28                    .get_block_by_number(BlockNumberOrTag::Number(block_number), hydrate)
29                    .await
30            }
31            None => Err(TransportError::local_usage_str("Error occurred while fetching the current block number within an EventHandler. The log.block_number value is None.")),
32        }
33    }
34}
35
36pub type EventHandlerInstance = Arc<Box<(dyn EventHandler + Send + Sync)>>;
37
38#[async_trait]
39pub trait EventHandler {
40    async fn handle(&self, params: EventContext);
41    fn name(&self) -> String;
42    fn event_signature(&self) -> String;
43}
44
45#[derive(Clone)]
46pub struct ProcessEventsInput {
47    pub start_block: u64,
48    pub address: Address,
49    pub step: u64,
50    pub handler: EventHandlerInstance,
51    pub templates: TemplateManager,
52    pub provider: Provider,
53    pub execution_mode: ExecutionMode,
54}
55
56pub async fn process_events(
57    ProcessEventsInput { start_block, execution_mode, step, address, handler, templates, provider }: ProcessEventsInput,
58) -> Result<(), TransportError> {
59    let event_signature = handler.event_signature();
60
61    let mut current_block = start_block;
62    let mut latest_block_manager =
63        LatestBlockManager::new(provider.clone(), Duration::from_secs(10));
64
65    loop {
66        let mut end_block = current_block + step;
67        let latest_block = latest_block_manager.get().await?;
68
69        if end_block > latest_block {
70            end_block = latest_block;
71        }
72
73        if current_block >= end_block {
74            tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
75            continue;
76        }
77
78        let source = handler.name();
79
80        println!("[{}] Processing logs from {} to {}", source, current_block, end_block);
81
82        let filter = Filter::new()
83            .address(address)
84            .event(&event_signature)
85            .from_block(current_block)
86            .to_block(end_block);
87
88        let logs = provider.get_logs(&filter).await?;
89
90        match execution_mode {
91            ExecutionMode::Parallel => {
92                for log in logs {
93                    let handler = handler.clone();
94                    let provider = provider.clone();
95                    let templates = templates.clone();
96
97                    tokio::spawn(async move {
98                        handler
99                            .handle(EventContext {
100                                log,
101                                provider,
102                                templates,
103                                contract_address: address,
104                            })
105                            .await;
106                    });
107                }
108            }
109            ExecutionMode::Serial => {
110                for log in logs {
111                    let templates = templates.clone();
112                    let provider = provider.clone();
113
114                    handler
115                        .handle(EventContext {
116                            log,
117                            provider,
118                            templates,
119                            contract_address: address,
120                        })
121                        .await;
122                }
123            }
124        }
125
126        current_block = end_block;
127    }
128}