Skip to main content

chainindex_core/
idempotency.rs

1//! Idempotency framework — ensures handlers produce correct state
2//! even when replayed after chain reorganizations.
3//!
4//! # Key concepts
5//!
6//! - **Deterministic entity IDs**: `{tx_hash}-{log_index}` ensures the same
7//!   event always produces the same entity ID, regardless of how many times
8//!   it is replayed.
9//! - **Reorg replay detection**: handlers receive a [`ReplayContext`] that
10//!   tells them whether the current execution is a replay of blocks that
11//!   were previously processed (after a reorg rollback).
12//! - **Side effect guard**: [`SideEffectGuard`] lets handlers skip external
13//!   API calls (webhooks, notifications) during replays while still
14//!   rebuilding local state correctly.
15//!
16//! # Example
17//!
18//! ```rust
19//! use chainindex_core::idempotency::{deterministic_id, ReplayContext, SideEffectGuard};
20//! use chainindex_core::handler::DecodedEvent;
21//!
22//! let event = DecodedEvent {
23//!     chain: "ethereum".into(),
24//!     schema: "ERC20Transfer".into(),
25//!     address: "0xdead".into(),
26//!     tx_hash: "0xabc123".into(),
27//!     block_number: 100,
28//!     log_index: 3,
29//!     fields_json: serde_json::json!({}),
30//! };
31//!
32//! let id = deterministic_id(&event);
33//! assert_eq!(id, "0xabc123-3");
34//! ```
35
36use std::sync::Arc;
37
38use async_trait::async_trait;
39
40use crate::error::IndexerError;
41use crate::handler::{DecodedEvent, EventHandler};
42use crate::types::IndexContext;
43
44// ─── Deterministic ID generation ─────────────────────────────────────────────
45
46/// Generate a deterministic entity ID from an event.
47///
48/// Format: `{tx_hash}-{log_index}`
49///
50/// This guarantees that the same on-chain event always maps to the same
51/// entity ID, making upsert operations idempotent across replays.
52pub fn deterministic_id(event: &DecodedEvent) -> String {
53    format!("{}-{}", event.tx_hash, event.log_index)
54}
55
56/// Generate a deterministic entity ID with a custom suffix.
57///
58/// Format: `{tx_hash}-{log_index}-{suffix}`
59///
60/// Useful when a single event produces multiple entities (e.g., a swap
61/// event creates both a "buy" and "sell" entity).
62pub fn deterministic_id_with_suffix(event: &DecodedEvent, suffix: &str) -> String {
63    format!("{}-{}-{}", event.tx_hash, event.log_index, suffix)
64}
65
66// ─── ReplayContext ───────────────────────────────────────────────────────────
67
68/// Context about whether the current execution is a reorg replay.
69///
70/// Passed to handlers so they can adjust their behavior during replay.
71/// For example, handlers should skip sending webhooks or notifications
72/// during replay, but still update local entity state.
73#[derive(Debug, Clone)]
74pub struct ReplayContext {
75    /// `true` if the indexer is replaying blocks after a reorg.
76    pub is_replay: bool,
77    /// The block number where the reorg was detected (fork point).
78    /// `None` if this is not a replay.
79    pub reorg_from_block: Option<u64>,
80    /// The original block hash that was replaced by the reorg.
81    /// `None` if this is not a replay.
82    pub original_block_hash: Option<String>,
83}
84
85impl ReplayContext {
86    /// Create a normal (non-replay) context.
87    pub fn normal() -> Self {
88        Self {
89            is_replay: false,
90            reorg_from_block: None,
91            original_block_hash: None,
92        }
93    }
94
95    /// Create a replay context for a reorg.
96    pub fn replay(reorg_from_block: u64, original_block_hash: Option<String>) -> Self {
97        Self {
98            is_replay: true,
99            reorg_from_block: Some(reorg_from_block),
100            original_block_hash,
101        }
102    }
103}
104
105// ─── SideEffectGuard ─────────────────────────────────────────────────────────
106
107/// Guard that tracks whether side effects should be executed.
108///
109/// During normal indexing, side effects (webhooks, external API calls)
110/// are executed. During replay after a reorg, side effects are skipped
111/// because they were already executed during the original processing.
112///
113/// # Example
114///
115/// ```rust
116/// use chainindex_core::idempotency::{ReplayContext, SideEffectGuard};
117///
118/// let ctx = ReplayContext::normal();
119/// let guard = SideEffectGuard::new(&ctx);
120/// assert!(guard.should_execute()); // normal mode: execute side effects
121///
122/// let replay_ctx = ReplayContext::replay(100, None);
123/// let replay_guard = SideEffectGuard::new(&replay_ctx);
124/// assert!(!replay_guard.should_execute()); // replay mode: skip side effects
125/// ```
126pub struct SideEffectGuard {
127    /// Whether side effects should be executed.
128    execute: bool,
129}
130
131impl SideEffectGuard {
132    /// Create a new guard based on the replay context.
133    ///
134    /// Side effects are skipped when `replay_ctx.is_replay` is `true`.
135    pub fn new(replay_ctx: &ReplayContext) -> Self {
136        Self {
137            execute: !replay_ctx.is_replay,
138        }
139    }
140
141    /// Returns `true` if side effects should be executed (not in replay mode).
142    pub fn should_execute(&self) -> bool {
143        self.execute
144    }
145
146    /// Execute a side effect only if not in replay mode.
147    ///
148    /// Returns `Some(result)` if the side effect was executed, or `None`
149    /// if it was skipped (replay mode).
150    pub async fn execute<F, Fut, T>(&self, f: F) -> Option<T>
151    where
152        F: FnOnce() -> Fut,
153        Fut: std::future::Future<Output = T>,
154    {
155        if self.execute {
156            Some(f().await)
157        } else {
158            None
159        }
160    }
161}
162
163// ─── IdempotentHandler ───────────────────────────────────────────────────────
164
165/// Wrapper around an [`EventHandler`] that adds idempotency tracking.
166///
167/// The `IdempotentHandler` wraps an inner handler and:
168/// 1. Generates a deterministic event ID before calling the inner handler.
169/// 2. Tracks which events have been processed (by their deterministic ID).
170/// 3. In normal mode, always calls the inner handler (for upsert semantics).
171/// 4. Provides the [`ReplayContext`] for the inner handler to use.
172///
173/// Note: the inner handler is always called even for already-seen events,
174/// because entity upsert is the correct idempotent behavior. The tracking
175/// is for observability and metrics, not for skipping events.
176pub struct IdempotentHandler {
177    /// The wrapped event handler.
178    inner: Arc<dyn EventHandler>,
179    /// Current replay context.
180    replay_ctx: ReplayContext,
181    /// Set of processed event IDs (for tracking/metrics).
182    processed_ids: std::sync::Mutex<std::collections::HashSet<String>>,
183}
184
185impl IdempotentHandler {
186    /// Create a new idempotent handler wrapping the given event handler.
187    pub fn new(inner: Arc<dyn EventHandler>, replay_ctx: ReplayContext) -> Self {
188        Self {
189            inner,
190            replay_ctx,
191            processed_ids: std::sync::Mutex::new(std::collections::HashSet::new()),
192        }
193    }
194
195    /// Returns the current replay context.
196    pub fn replay_context(&self) -> &ReplayContext {
197        &self.replay_ctx
198    }
199
200    /// Returns the number of events processed by this handler.
201    pub fn processed_count(&self) -> usize {
202        self.processed_ids.lock().map(|ids| ids.len()).unwrap_or(0)
203    }
204
205    /// Returns `true` if an event with the given deterministic ID has been processed.
206    pub fn has_processed(&self, event_id: &str) -> bool {
207        self.processed_ids
208            .lock()
209            .map(|ids| ids.contains(event_id))
210            .unwrap_or(false)
211    }
212
213    /// Create a [`SideEffectGuard`] for this handler's replay context.
214    pub fn side_effect_guard(&self) -> SideEffectGuard {
215        SideEffectGuard::new(&self.replay_ctx)
216    }
217}
218
219#[async_trait]
220impl EventHandler for IdempotentHandler {
221    async fn handle(&self, event: &DecodedEvent, ctx: &IndexContext) -> Result<(), IndexerError> {
222        let event_id = deterministic_id(event);
223
224        // Track the event ID.
225        if let Ok(mut ids) = self.processed_ids.lock() {
226            ids.insert(event_id);
227        }
228
229        // Always call the inner handler (upsert semantics).
230        self.inner.handle(event, ctx).await
231    }
232
233    fn schema_name(&self) -> &str {
234        self.inner.schema_name()
235    }
236}
237
238// ─── Tests ───────────────────────────────────────────────────────────────────
239
240#[cfg(test)]
241mod tests {
242    use super::*;
243    use std::sync::atomic::{AtomicU32, Ordering};
244
245    fn make_event(tx_hash: &str, log_index: u32) -> DecodedEvent {
246        DecodedEvent {
247            chain: "ethereum".into(),
248            schema: "ERC20Transfer".into(),
249            address: "0xdead".into(),
250            tx_hash: tx_hash.to_string(),
251            block_number: 100,
252            log_index,
253            fields_json: serde_json::json!({}),
254        }
255    }
256
257    fn dummy_ctx() -> IndexContext {
258        IndexContext {
259            block: crate::types::BlockSummary {
260                number: 100,
261                hash: "0xa".into(),
262                parent_hash: "0x0".into(),
263                timestamp: 0,
264                tx_count: 0,
265            },
266            phase: crate::types::IndexPhase::Backfill,
267            chain: "ethereum".into(),
268        }
269    }
270
271    // ── deterministic_id ─────────────────────────────────────────────────
272
273    #[test]
274    fn deterministic_id_is_stable() {
275        let event = make_event("0xabc123", 3);
276        let id1 = deterministic_id(&event);
277        let id2 = deterministic_id(&event);
278        assert_eq!(id1, id2);
279        assert_eq!(id1, "0xabc123-3");
280    }
281
282    #[test]
283    fn different_events_get_different_ids() {
284        let e1 = make_event("0xabc", 0);
285        let e2 = make_event("0xabc", 1);
286        let e3 = make_event("0xdef", 0);
287
288        assert_ne!(deterministic_id(&e1), deterministic_id(&e2));
289        assert_ne!(deterministic_id(&e1), deterministic_id(&e3));
290    }
291
292    #[test]
293    fn deterministic_id_with_suffix_works() {
294        let event = make_event("0xabc", 2);
295        let id = deterministic_id_with_suffix(&event, "buy");
296        assert_eq!(id, "0xabc-2-buy");
297
298        let id2 = deterministic_id_with_suffix(&event, "sell");
299        assert_eq!(id2, "0xabc-2-sell");
300        assert_ne!(id, id2);
301    }
302
303    // ── ReplayContext ────────────────────────────────────────────────────
304
305    #[test]
306    fn replay_context_normal() {
307        let ctx = ReplayContext::normal();
308        assert!(!ctx.is_replay);
309        assert!(ctx.reorg_from_block.is_none());
310        assert!(ctx.original_block_hash.is_none());
311    }
312
313    #[test]
314    fn replay_context_replay() {
315        let ctx = ReplayContext::replay(100, Some("0xold_hash".to_string()));
316        assert!(ctx.is_replay);
317        assert_eq!(ctx.reorg_from_block, Some(100));
318        assert_eq!(ctx.original_block_hash.as_deref(), Some("0xold_hash"));
319    }
320
321    // ── SideEffectGuard ──────────────────────────────────────────────────
322
323    #[test]
324    fn side_effect_guard_executes_normally() {
325        let ctx = ReplayContext::normal();
326        let guard = SideEffectGuard::new(&ctx);
327        assert!(guard.should_execute());
328    }
329
330    #[test]
331    fn side_effect_guard_skips_during_replay() {
332        let ctx = ReplayContext::replay(100, None);
333        let guard = SideEffectGuard::new(&ctx);
334        assert!(!guard.should_execute());
335    }
336
337    #[tokio::test]
338    async fn side_effect_guard_execute_fn_normal() {
339        let ctx = ReplayContext::normal();
340        let guard = SideEffectGuard::new(&ctx);
341
342        let result = guard.execute(|| async { 42 }).await;
343        assert_eq!(result, Some(42));
344    }
345
346    #[tokio::test]
347    async fn side_effect_guard_execute_fn_replay() {
348        let ctx = ReplayContext::replay(100, None);
349        let guard = SideEffectGuard::new(&ctx);
350
351        let result = guard.execute(|| async { 42 }).await;
352        assert_eq!(result, None);
353    }
354
355    // ── IdempotentHandler ────────────────────────────────────────────────
356
357    struct CountingHandler {
358        count: Arc<AtomicU32>,
359        schema: String,
360    }
361
362    #[async_trait]
363    impl EventHandler for CountingHandler {
364        async fn handle(
365            &self,
366            _event: &DecodedEvent,
367            _ctx: &IndexContext,
368        ) -> Result<(), IndexerError> {
369            self.count.fetch_add(1, Ordering::Relaxed);
370            Ok(())
371        }
372
373        fn schema_name(&self) -> &str {
374            &self.schema
375        }
376    }
377
378    #[tokio::test]
379    async fn idempotent_handler_wraps_inner() {
380        let count = Arc::new(AtomicU32::new(0));
381        let inner = Arc::new(CountingHandler {
382            count: count.clone(),
383            schema: "ERC20Transfer".into(),
384        });
385
386        let handler = IdempotentHandler::new(inner, ReplayContext::normal());
387        assert_eq!(handler.schema_name(), "ERC20Transfer");
388
389        let event = make_event("0xabc", 0);
390        let ctx = dummy_ctx();
391
392        handler.handle(&event, &ctx).await.unwrap();
393        assert_eq!(count.load(Ordering::Relaxed), 1);
394        assert_eq!(handler.processed_count(), 1);
395        assert!(handler.has_processed("0xabc-0"));
396
397        // Calling again with same event still calls inner (upsert semantics).
398        handler.handle(&event, &ctx).await.unwrap();
399        assert_eq!(count.load(Ordering::Relaxed), 2);
400        // ID is already in the set, count stays at 1 unique.
401        assert_eq!(handler.processed_count(), 1);
402    }
403
404    #[tokio::test]
405    async fn idempotent_handler_tracks_multiple_events() {
406        let count = Arc::new(AtomicU32::new(0));
407        let inner = Arc::new(CountingHandler {
408            count: count.clone(),
409            schema: "ERC20Transfer".into(),
410        });
411
412        let handler = IdempotentHandler::new(inner, ReplayContext::normal());
413        let ctx = dummy_ctx();
414
415        handler.handle(&make_event("0xabc", 0), &ctx).await.unwrap();
416        handler.handle(&make_event("0xabc", 1), &ctx).await.unwrap();
417        handler.handle(&make_event("0xdef", 0), &ctx).await.unwrap();
418
419        assert_eq!(handler.processed_count(), 3);
420        assert!(handler.has_processed("0xabc-0"));
421        assert!(handler.has_processed("0xabc-1"));
422        assert!(handler.has_processed("0xdef-0"));
423        assert!(!handler.has_processed("0xghi-0"));
424    }
425
426    #[test]
427    fn idempotent_handler_side_effect_guard_normal() {
428        let inner = Arc::new(CountingHandler {
429            count: Arc::new(AtomicU32::new(0)),
430            schema: "Test".into(),
431        });
432        let handler = IdempotentHandler::new(inner, ReplayContext::normal());
433        assert!(handler.side_effect_guard().should_execute());
434    }
435
436    #[test]
437    fn idempotent_handler_side_effect_guard_replay() {
438        let inner = Arc::new(CountingHandler {
439            count: Arc::new(AtomicU32::new(0)),
440            schema: "Test".into(),
441        });
442        let handler = IdempotentHandler::new(inner, ReplayContext::replay(100, None));
443        assert!(!handler.side_effect_guard().should_execute());
444    }
445}