1use std::sync::Arc;
37
38use async_trait::async_trait;
39
40use crate::error::IndexerError;
41use crate::handler::{DecodedEvent, EventHandler};
42use crate::types::IndexContext;
43
44pub fn deterministic_id(event: &DecodedEvent) -> String {
53 format!("{}-{}", event.tx_hash, event.log_index)
54}
55
56pub fn deterministic_id_with_suffix(event: &DecodedEvent, suffix: &str) -> String {
63 format!("{}-{}-{}", event.tx_hash, event.log_index, suffix)
64}
65
66#[derive(Debug, Clone)]
74pub struct ReplayContext {
75 pub is_replay: bool,
77 pub reorg_from_block: Option<u64>,
80 pub original_block_hash: Option<String>,
83}
84
85impl ReplayContext {
86 pub fn normal() -> Self {
88 Self {
89 is_replay: false,
90 reorg_from_block: None,
91 original_block_hash: None,
92 }
93 }
94
95 pub fn replay(reorg_from_block: u64, original_block_hash: Option<String>) -> Self {
97 Self {
98 is_replay: true,
99 reorg_from_block: Some(reorg_from_block),
100 original_block_hash,
101 }
102 }
103}
104
105pub struct SideEffectGuard {
127 execute: bool,
129}
130
131impl SideEffectGuard {
132 pub fn new(replay_ctx: &ReplayContext) -> Self {
136 Self {
137 execute: !replay_ctx.is_replay,
138 }
139 }
140
141 pub fn should_execute(&self) -> bool {
143 self.execute
144 }
145
146 pub async fn execute<F, Fut, T>(&self, f: F) -> Option<T>
151 where
152 F: FnOnce() -> Fut,
153 Fut: std::future::Future<Output = T>,
154 {
155 if self.execute {
156 Some(f().await)
157 } else {
158 None
159 }
160 }
161}
162
163pub struct IdempotentHandler {
177 inner: Arc<dyn EventHandler>,
179 replay_ctx: ReplayContext,
181 processed_ids: std::sync::Mutex<std::collections::HashSet<String>>,
183}
184
185impl IdempotentHandler {
186 pub fn new(inner: Arc<dyn EventHandler>, replay_ctx: ReplayContext) -> Self {
188 Self {
189 inner,
190 replay_ctx,
191 processed_ids: std::sync::Mutex::new(std::collections::HashSet::new()),
192 }
193 }
194
195 pub fn replay_context(&self) -> &ReplayContext {
197 &self.replay_ctx
198 }
199
200 pub fn processed_count(&self) -> usize {
202 self.processed_ids.lock().map(|ids| ids.len()).unwrap_or(0)
203 }
204
205 pub fn has_processed(&self, event_id: &str) -> bool {
207 self.processed_ids
208 .lock()
209 .map(|ids| ids.contains(event_id))
210 .unwrap_or(false)
211 }
212
213 pub fn side_effect_guard(&self) -> SideEffectGuard {
215 SideEffectGuard::new(&self.replay_ctx)
216 }
217}
218
219#[async_trait]
220impl EventHandler for IdempotentHandler {
221 async fn handle(&self, event: &DecodedEvent, ctx: &IndexContext) -> Result<(), IndexerError> {
222 let event_id = deterministic_id(event);
223
224 if let Ok(mut ids) = self.processed_ids.lock() {
226 ids.insert(event_id);
227 }
228
229 self.inner.handle(event, ctx).await
231 }
232
233 fn schema_name(&self) -> &str {
234 self.inner.schema_name()
235 }
236}
237
238#[cfg(test)]
241mod tests {
242 use super::*;
243 use std::sync::atomic::{AtomicU32, Ordering};
244
245 fn make_event(tx_hash: &str, log_index: u32) -> DecodedEvent {
246 DecodedEvent {
247 chain: "ethereum".into(),
248 schema: "ERC20Transfer".into(),
249 address: "0xdead".into(),
250 tx_hash: tx_hash.to_string(),
251 block_number: 100,
252 log_index,
253 fields_json: serde_json::json!({}),
254 }
255 }
256
257 fn dummy_ctx() -> IndexContext {
258 IndexContext {
259 block: crate::types::BlockSummary {
260 number: 100,
261 hash: "0xa".into(),
262 parent_hash: "0x0".into(),
263 timestamp: 0,
264 tx_count: 0,
265 },
266 phase: crate::types::IndexPhase::Backfill,
267 chain: "ethereum".into(),
268 }
269 }
270
271 #[test]
274 fn deterministic_id_is_stable() {
275 let event = make_event("0xabc123", 3);
276 let id1 = deterministic_id(&event);
277 let id2 = deterministic_id(&event);
278 assert_eq!(id1, id2);
279 assert_eq!(id1, "0xabc123-3");
280 }
281
282 #[test]
283 fn different_events_get_different_ids() {
284 let e1 = make_event("0xabc", 0);
285 let e2 = make_event("0xabc", 1);
286 let e3 = make_event("0xdef", 0);
287
288 assert_ne!(deterministic_id(&e1), deterministic_id(&e2));
289 assert_ne!(deterministic_id(&e1), deterministic_id(&e3));
290 }
291
292 #[test]
293 fn deterministic_id_with_suffix_works() {
294 let event = make_event("0xabc", 2);
295 let id = deterministic_id_with_suffix(&event, "buy");
296 assert_eq!(id, "0xabc-2-buy");
297
298 let id2 = deterministic_id_with_suffix(&event, "sell");
299 assert_eq!(id2, "0xabc-2-sell");
300 assert_ne!(id, id2);
301 }
302
303 #[test]
306 fn replay_context_normal() {
307 let ctx = ReplayContext::normal();
308 assert!(!ctx.is_replay);
309 assert!(ctx.reorg_from_block.is_none());
310 assert!(ctx.original_block_hash.is_none());
311 }
312
313 #[test]
314 fn replay_context_replay() {
315 let ctx = ReplayContext::replay(100, Some("0xold_hash".to_string()));
316 assert!(ctx.is_replay);
317 assert_eq!(ctx.reorg_from_block, Some(100));
318 assert_eq!(ctx.original_block_hash.as_deref(), Some("0xold_hash"));
319 }
320
321 #[test]
324 fn side_effect_guard_executes_normally() {
325 let ctx = ReplayContext::normal();
326 let guard = SideEffectGuard::new(&ctx);
327 assert!(guard.should_execute());
328 }
329
330 #[test]
331 fn side_effect_guard_skips_during_replay() {
332 let ctx = ReplayContext::replay(100, None);
333 let guard = SideEffectGuard::new(&ctx);
334 assert!(!guard.should_execute());
335 }
336
337 #[tokio::test]
338 async fn side_effect_guard_execute_fn_normal() {
339 let ctx = ReplayContext::normal();
340 let guard = SideEffectGuard::new(&ctx);
341
342 let result = guard.execute(|| async { 42 }).await;
343 assert_eq!(result, Some(42));
344 }
345
346 #[tokio::test]
347 async fn side_effect_guard_execute_fn_replay() {
348 let ctx = ReplayContext::replay(100, None);
349 let guard = SideEffectGuard::new(&ctx);
350
351 let result = guard.execute(|| async { 42 }).await;
352 assert_eq!(result, None);
353 }
354
355 struct CountingHandler {
358 count: Arc<AtomicU32>,
359 schema: String,
360 }
361
362 #[async_trait]
363 impl EventHandler for CountingHandler {
364 async fn handle(
365 &self,
366 _event: &DecodedEvent,
367 _ctx: &IndexContext,
368 ) -> Result<(), IndexerError> {
369 self.count.fetch_add(1, Ordering::Relaxed);
370 Ok(())
371 }
372
373 fn schema_name(&self) -> &str {
374 &self.schema
375 }
376 }
377
378 #[tokio::test]
379 async fn idempotent_handler_wraps_inner() {
380 let count = Arc::new(AtomicU32::new(0));
381 let inner = Arc::new(CountingHandler {
382 count: count.clone(),
383 schema: "ERC20Transfer".into(),
384 });
385
386 let handler = IdempotentHandler::new(inner, ReplayContext::normal());
387 assert_eq!(handler.schema_name(), "ERC20Transfer");
388
389 let event = make_event("0xabc", 0);
390 let ctx = dummy_ctx();
391
392 handler.handle(&event, &ctx).await.unwrap();
393 assert_eq!(count.load(Ordering::Relaxed), 1);
394 assert_eq!(handler.processed_count(), 1);
395 assert!(handler.has_processed("0xabc-0"));
396
397 handler.handle(&event, &ctx).await.unwrap();
399 assert_eq!(count.load(Ordering::Relaxed), 2);
400 assert_eq!(handler.processed_count(), 1);
402 }
403
404 #[tokio::test]
405 async fn idempotent_handler_tracks_multiple_events() {
406 let count = Arc::new(AtomicU32::new(0));
407 let inner = Arc::new(CountingHandler {
408 count: count.clone(),
409 schema: "ERC20Transfer".into(),
410 });
411
412 let handler = IdempotentHandler::new(inner, ReplayContext::normal());
413 let ctx = dummy_ctx();
414
415 handler.handle(&make_event("0xabc", 0), &ctx).await.unwrap();
416 handler.handle(&make_event("0xabc", 1), &ctx).await.unwrap();
417 handler.handle(&make_event("0xdef", 0), &ctx).await.unwrap();
418
419 assert_eq!(handler.processed_count(), 3);
420 assert!(handler.has_processed("0xabc-0"));
421 assert!(handler.has_processed("0xabc-1"));
422 assert!(handler.has_processed("0xdef-0"));
423 assert!(!handler.has_processed("0xghi-0"));
424 }
425
426 #[test]
427 fn idempotent_handler_side_effect_guard_normal() {
428 let inner = Arc::new(CountingHandler {
429 count: Arc::new(AtomicU32::new(0)),
430 schema: "Test".into(),
431 });
432 let handler = IdempotentHandler::new(inner, ReplayContext::normal());
433 assert!(handler.side_effect_guard().should_execute());
434 }
435
436 #[test]
437 fn idempotent_handler_side_effect_guard_replay() {
438 let inner = Arc::new(CountingHandler {
439 count: Arc::new(AtomicU32::new(0)),
440 schema: "Test".into(),
441 });
442 let handler = IdempotentHandler::new(inner, ReplayContext::replay(100, None));
443 assert!(!handler.side_effect_guard().should_execute());
444 }
445}