Skip to main content

chainindex_core/
block_handler.rs

1//! Enhanced block handler system — interval handlers, setup handlers,
2//! and execution ordering for block-level operations.
3//!
4//! Block handlers fire for every block. Interval handlers fire every N blocks.
5//! Setup handlers fire once before indexing begins. All block-level handlers
6//! execute BEFORE event handlers for the same block.
7//!
8//! # Example
9//!
10//! ```rust,ignore
11//! use chainindex_core::block_handler::{IntervalHandler, SetupHandler, BlockHandlerScheduler};
12//!
13//! // Snapshot handler fires every 1000 blocks
14//! struct SnapshotHandler;
15//!
16//! #[async_trait::async_trait]
17//! impl IntervalHandler for SnapshotHandler {
18//!     async fn handle(&self, block: &BlockSummary, ctx: &IndexContext) -> Result<(), IndexerError> {
19//!         println!("Taking snapshot at block {}", block.number);
20//!         Ok(())
21//!     }
22//!     fn interval(&self) -> u64 { 1000 }
23//!     fn name(&self) -> &str { "snapshot" }
24//! }
25//! ```
26
27use async_trait::async_trait;
28use std::sync::Arc;
29
30use crate::error::IndexerError;
31use crate::types::{BlockSummary, IndexContext};
32
33// ─── IntervalHandler ─────────────────────────────────────────────────────────
34
35/// Handler that fires every N blocks.
36///
37/// Useful for periodic tasks such as taking snapshots, computing aggregates,
38/// flushing caches, or emitting metrics at regular block intervals.
39#[async_trait]
40pub trait IntervalHandler: Send + Sync {
41    /// Called every `interval()` blocks.
42    ///
43    /// The handler receives the current block summary and indexing context.
44    /// Returning an error will propagate up to the index loop.
45    async fn handle(&self, block: &BlockSummary, ctx: &IndexContext) -> Result<(), IndexerError>;
46
47    /// How often this handler should fire, measured in blocks.
48    ///
49    /// For example, returning `100` means this handler fires on blocks
50    /// 0, 100, 200, 300, etc.
51    fn interval(&self) -> u64;
52
53    /// Human-readable handler name for logging and diagnostics.
54    fn name(&self) -> &str;
55}
56
57// ─── SetupHandler ────────────────────────────────────────────────────────────
58
59/// Handler that fires once before indexing starts.
60///
61/// Use this for one-time initialization tasks such as creating database
62/// tables, registering metrics, or loading reference data.
63#[async_trait]
64pub trait SetupHandler: Send + Sync {
65    /// Called once during indexer initialization, before any blocks are processed.
66    ///
67    /// The context contains the starting block information.
68    async fn setup(&self, ctx: &IndexContext) -> Result<(), IndexerError>;
69
70    /// Human-readable handler name for logging and diagnostics.
71    fn name(&self) -> &str;
72}
73
74// ─── BlockHandlerScheduler ───────────────────────────────────────────────────
75
76/// Manages block-level handler scheduling and execution.
77///
78/// The scheduler maintains a list of interval handlers (which fire every N
79/// blocks) and setup handlers (which fire once). It determines when each
80/// handler should run and executes them in registration order.
81pub struct BlockHandlerScheduler {
82    /// Interval handlers, each with its own cadence.
83    interval_handlers: Vec<Arc<dyn IntervalHandler>>,
84    /// Setup handlers that run once before indexing.
85    setup_handlers: Vec<Arc<dyn SetupHandler>>,
86    /// Whether `run_setup` has already been called.
87    setup_complete: bool,
88}
89
90impl BlockHandlerScheduler {
91    /// Create a new empty scheduler.
92    pub fn new() -> Self {
93        Self {
94            interval_handlers: Vec::new(),
95            setup_handlers: Vec::new(),
96            setup_complete: false,
97        }
98    }
99
100    /// Register an interval handler.
101    ///
102    /// Handlers are executed in registration order when their interval is due.
103    pub fn register_interval(&mut self, handler: Arc<dyn IntervalHandler>) {
104        tracing::debug!(
105            handler = handler.name(),
106            interval = handler.interval(),
107            "registered interval handler"
108        );
109        self.interval_handlers.push(handler);
110    }
111
112    /// Register a setup handler.
113    ///
114    /// Setup handlers run once during `run_setup`, in registration order.
115    pub fn register_setup(&mut self, handler: Arc<dyn SetupHandler>) {
116        tracing::debug!(handler = handler.name(), "registered setup handler");
117        self.setup_handlers.push(handler);
118    }
119
120    /// Run all setup handlers once.
121    ///
122    /// This method is idempotent — calling it more than once has no effect.
123    /// Returns an error if any setup handler fails.
124    pub async fn run_setup(&mut self, ctx: &IndexContext) -> Result<(), IndexerError> {
125        if self.setup_complete {
126            tracing::debug!("setup already complete, skipping");
127            return Ok(());
128        }
129
130        for handler in &self.setup_handlers {
131            tracing::info!(handler = handler.name(), "running setup handler");
132            handler
133                .setup(ctx)
134                .await
135                .map_err(|e| IndexerError::Handler {
136                    handler: handler.name().to_string(),
137                    reason: e.to_string(),
138                })?;
139        }
140
141        self.setup_complete = true;
142        Ok(())
143    }
144
145    /// Run all interval handlers that are due for the given block.
146    ///
147    /// A handler fires when `block.number % handler.interval() == 0`.
148    /// Handlers are executed in registration order.
149    pub async fn run_block(
150        &self,
151        block: &BlockSummary,
152        ctx: &IndexContext,
153    ) -> Result<(), IndexerError> {
154        for handler in &self.interval_handlers {
155            if self.should_run_interval(handler.as_ref(), block.number) {
156                tracing::debug!(
157                    handler = handler.name(),
158                    block = block.number,
159                    "running interval handler"
160                );
161                handler
162                    .handle(block, ctx)
163                    .await
164                    .map_err(|e| IndexerError::Handler {
165                        handler: handler.name().to_string(),
166                        reason: e.to_string(),
167                    })?;
168            }
169        }
170        Ok(())
171    }
172
173    /// Check whether an interval handler should fire at the given block number.
174    ///
175    /// Returns `true` if `block_number % interval == 0`. An interval of 0 is
176    /// treated as "never fire" to avoid division by zero.
177    pub fn should_run_interval(&self, handler: &dyn IntervalHandler, block_number: u64) -> bool {
178        let interval = handler.interval();
179        if interval == 0 {
180            return false;
181        }
182        block_number.is_multiple_of(interval)
183    }
184
185    /// Returns whether setup has been completed.
186    pub fn is_setup_complete(&self) -> bool {
187        self.setup_complete
188    }
189
190    /// Returns the number of registered interval handlers.
191    pub fn interval_handler_count(&self) -> usize {
192        self.interval_handlers.len()
193    }
194
195    /// Returns the number of registered setup handlers.
196    pub fn setup_handler_count(&self) -> usize {
197        self.setup_handlers.len()
198    }
199}
200
201impl Default for BlockHandlerScheduler {
202    fn default() -> Self {
203        Self::new()
204    }
205}
206
207// ─── Tests ───────────────────────────────────────────────────────────────────
208
209#[cfg(test)]
210mod tests {
211    use super::*;
212    use std::sync::atomic::{AtomicU32, Ordering};
213
214    /// Helper: create a dummy IndexContext for testing.
215    fn dummy_ctx() -> IndexContext {
216        IndexContext {
217            block: BlockSummary {
218                number: 0,
219                hash: "0x0".into(),
220                parent_hash: "0x0".into(),
221                timestamp: 0,
222                tx_count: 0,
223            },
224            phase: crate::types::IndexPhase::Backfill,
225            chain: "ethereum".into(),
226        }
227    }
228
229    /// Helper: create a BlockSummary at the given block number.
230    fn block_at(number: u64) -> BlockSummary {
231        BlockSummary {
232            number,
233            hash: format!("0x{:x}", number),
234            parent_hash: format!("0x{:x}", number.saturating_sub(1)),
235            timestamp: number as i64 * 12,
236            tx_count: 0,
237        }
238    }
239
240    /// A test interval handler that counts invocations.
241    struct CountingInterval {
242        count: Arc<AtomicU32>,
243        interval: u64,
244        name: String,
245    }
246
247    impl CountingInterval {
248        fn new(interval: u64, name: &str) -> (Arc<Self>, Arc<AtomicU32>) {
249            let count = Arc::new(AtomicU32::new(0));
250            let handler = Arc::new(Self {
251                count: count.clone(),
252                interval,
253                name: name.to_string(),
254            });
255            (handler, count)
256        }
257    }
258
259    #[async_trait]
260    impl IntervalHandler for CountingInterval {
261        async fn handle(
262            &self,
263            _block: &BlockSummary,
264            _ctx: &IndexContext,
265        ) -> Result<(), IndexerError> {
266            self.count.fetch_add(1, Ordering::Relaxed);
267            Ok(())
268        }
269
270        fn interval(&self) -> u64 {
271            self.interval
272        }
273
274        fn name(&self) -> &str {
275            &self.name
276        }
277    }
278
279    /// A test setup handler that counts invocations.
280    struct CountingSetup {
281        count: Arc<AtomicU32>,
282        name: String,
283    }
284
285    impl CountingSetup {
286        fn new(name: &str) -> (Arc<Self>, Arc<AtomicU32>) {
287            let count = Arc::new(AtomicU32::new(0));
288            let handler = Arc::new(Self {
289                count: count.clone(),
290                name: name.to_string(),
291            });
292            (handler, count)
293        }
294    }
295
296    #[async_trait]
297    impl SetupHandler for CountingSetup {
298        async fn setup(&self, _ctx: &IndexContext) -> Result<(), IndexerError> {
299            self.count.fetch_add(1, Ordering::Relaxed);
300            Ok(())
301        }
302
303        fn name(&self) -> &str {
304            &self.name
305        }
306    }
307
308    /// A failing interval handler for error propagation tests.
309    struct FailingInterval;
310
311    #[async_trait]
312    impl IntervalHandler for FailingInterval {
313        async fn handle(
314            &self,
315            _block: &BlockSummary,
316            _ctx: &IndexContext,
317        ) -> Result<(), IndexerError> {
318            Err(IndexerError::Other("interval handler failed".into()))
319        }
320
321        fn interval(&self) -> u64 {
322            1
323        }
324
325        fn name(&self) -> &str {
326            "failing"
327        }
328    }
329
330    /// A failing setup handler for error propagation tests.
331    struct FailingSetup;
332
333    #[async_trait]
334    impl SetupHandler for FailingSetup {
335        async fn setup(&self, _ctx: &IndexContext) -> Result<(), IndexerError> {
336            Err(IndexerError::Other("setup failed".into()))
337        }
338
339        fn name(&self) -> &str {
340            "failing_setup"
341        }
342    }
343
344    // ── Test: register interval handler ──────────────────────────────────────
345
346    #[test]
347    fn register_interval_handler() {
348        let mut scheduler = BlockHandlerScheduler::new();
349        assert_eq!(scheduler.interval_handler_count(), 0);
350
351        let (handler, _) = CountingInterval::new(10, "test");
352        scheduler.register_interval(handler);
353        assert_eq!(scheduler.interval_handler_count(), 1);
354    }
355
356    // ── Test: interval handler fires at correct interval ─────────────────────
357
358    #[tokio::test]
359    async fn interval_handler_fires_at_correct_interval() {
360        let mut scheduler = BlockHandlerScheduler::new();
361        let (handler, count) = CountingInterval::new(10, "every_10");
362        scheduler.register_interval(handler);
363
364        let ctx = dummy_ctx();
365
366        // Process blocks 0..30 — handler should fire at 0, 10, 20 = 3 times
367        for i in 0..30 {
368            scheduler.run_block(&block_at(i), &ctx).await.unwrap();
369        }
370
371        assert_eq!(count.load(Ordering::Relaxed), 3);
372    }
373
374    // ── Test: setup handler runs once ────────────────────────────────────────
375
376    #[tokio::test]
377    async fn setup_runs_once() {
378        let mut scheduler = BlockHandlerScheduler::new();
379        let (handler, count) = CountingSetup::new("init");
380        scheduler.register_setup(handler);
381
382        let ctx = dummy_ctx();
383
384        // Run setup twice — should only execute handlers once
385        scheduler.run_setup(&ctx).await.unwrap();
386        scheduler.run_setup(&ctx).await.unwrap();
387
388        assert_eq!(count.load(Ordering::Relaxed), 1);
389        assert!(scheduler.is_setup_complete());
390    }
391
392    // ── Test: multiple interval handlers with different intervals ─────────────
393
394    #[tokio::test]
395    async fn multiple_interval_handlers_different_intervals() {
396        let mut scheduler = BlockHandlerScheduler::new();
397
398        let (h5, count5) = CountingInterval::new(5, "every_5");
399        let (h7, count7) = CountingInterval::new(7, "every_7");
400        scheduler.register_interval(h5);
401        scheduler.register_interval(h7);
402
403        let ctx = dummy_ctx();
404
405        // Process blocks 0..35
406        // every_5 fires at: 0, 5, 10, 15, 20, 25, 30 = 7 times
407        // every_7 fires at: 0, 7, 14, 21, 28 = 5 times
408        for i in 0..35 {
409            scheduler.run_block(&block_at(i), &ctx).await.unwrap();
410        }
411
412        assert_eq!(count5.load(Ordering::Relaxed), 7);
413        assert_eq!(count7.load(Ordering::Relaxed), 5);
414    }
415
416    // ── Test: block 0 handling ───────────────────────────────────────────────
417
418    #[tokio::test]
419    async fn block_zero_fires_all_interval_handlers() {
420        let mut scheduler = BlockHandlerScheduler::new();
421
422        let (h100, count100) = CountingInterval::new(100, "every_100");
423        let (h1000, count1000) = CountingInterval::new(1000, "every_1000");
424        scheduler.register_interval(h100);
425        scheduler.register_interval(h1000);
426
427        let ctx = dummy_ctx();
428
429        // Block 0 — all interval handlers should fire (0 % N == 0 for all N)
430        scheduler.run_block(&block_at(0), &ctx).await.unwrap();
431
432        assert_eq!(count100.load(Ordering::Relaxed), 1);
433        assert_eq!(count1000.load(Ordering::Relaxed), 1);
434    }
435
436    // ── Test: handler error propagation (interval) ───────────────────────────
437
438    #[tokio::test]
439    async fn interval_handler_error_propagation() {
440        let mut scheduler = BlockHandlerScheduler::new();
441        scheduler.register_interval(Arc::new(FailingInterval));
442
443        let ctx = dummy_ctx();
444        let result = scheduler.run_block(&block_at(0), &ctx).await;
445
446        assert!(result.is_err());
447        let err = result.unwrap_err();
448        match err {
449            IndexerError::Handler { handler, reason } => {
450                assert_eq!(handler, "failing");
451                assert!(reason.contains("interval handler failed"));
452            }
453            _ => panic!("expected Handler error, got {:?}", err),
454        }
455    }
456
457    // ── Test: setup handler error propagation ────────────────────────────────
458
459    #[tokio::test]
460    async fn setup_handler_error_propagation() {
461        let mut scheduler = BlockHandlerScheduler::new();
462        scheduler.register_setup(Arc::new(FailingSetup));
463
464        let ctx = dummy_ctx();
465        let result = scheduler.run_setup(&ctx).await;
466
467        assert!(result.is_err());
468        assert!(!scheduler.is_setup_complete());
469    }
470
471    // ── Test: zero interval never fires ──────────────────────────────────────
472
473    #[tokio::test]
474    async fn zero_interval_never_fires() {
475        let mut scheduler = BlockHandlerScheduler::new();
476        let (handler, count) = CountingInterval::new(0, "never");
477        scheduler.register_interval(handler);
478
479        let ctx = dummy_ctx();
480
481        for i in 0..100 {
482            scheduler.run_block(&block_at(i), &ctx).await.unwrap();
483        }
484
485        assert_eq!(count.load(Ordering::Relaxed), 0);
486    }
487
488    // ── Test: should_run_interval correctness ────────────────────────────────
489
490    #[test]
491    fn should_run_interval_correctness() {
492        let scheduler = BlockHandlerScheduler::new();
493        let (handler, _) = CountingInterval::new(10, "test");
494
495        assert!(scheduler.should_run_interval(handler.as_ref(), 0));
496        assert!(!scheduler.should_run_interval(handler.as_ref(), 1));
497        assert!(!scheduler.should_run_interval(handler.as_ref(), 9));
498        assert!(scheduler.should_run_interval(handler.as_ref(), 10));
499        assert!(scheduler.should_run_interval(handler.as_ref(), 100));
500        assert!(!scheduler.should_run_interval(handler.as_ref(), 101));
501    }
502
503    // ── Test: multiple setup handlers all run ────────────────────────────────
504
505    #[tokio::test]
506    async fn multiple_setup_handlers_all_run() {
507        let mut scheduler = BlockHandlerScheduler::new();
508
509        let (h1, count1) = CountingSetup::new("setup_a");
510        let (h2, count2) = CountingSetup::new("setup_b");
511        let (h3, count3) = CountingSetup::new("setup_c");
512
513        scheduler.register_setup(h1);
514        scheduler.register_setup(h2);
515        scheduler.register_setup(h3);
516
517        let ctx = dummy_ctx();
518        scheduler.run_setup(&ctx).await.unwrap();
519
520        assert_eq!(count1.load(Ordering::Relaxed), 1);
521        assert_eq!(count2.load(Ordering::Relaxed), 1);
522        assert_eq!(count3.load(Ordering::Relaxed), 1);
523    }
524}