1use async_trait::async_trait;
28use std::sync::Arc;
29
30use crate::error::IndexerError;
31use crate::types::{BlockSummary, IndexContext};
32
33#[async_trait]
40pub trait IntervalHandler: Send + Sync {
41 async fn handle(&self, block: &BlockSummary, ctx: &IndexContext) -> Result<(), IndexerError>;
46
47 fn interval(&self) -> u64;
52
53 fn name(&self) -> &str;
55}
56
57#[async_trait]
64pub trait SetupHandler: Send + Sync {
65 async fn setup(&self, ctx: &IndexContext) -> Result<(), IndexerError>;
69
70 fn name(&self) -> &str;
72}
73
74pub struct BlockHandlerScheduler {
82 interval_handlers: Vec<Arc<dyn IntervalHandler>>,
84 setup_handlers: Vec<Arc<dyn SetupHandler>>,
86 setup_complete: bool,
88}
89
90impl BlockHandlerScheduler {
91 pub fn new() -> Self {
93 Self {
94 interval_handlers: Vec::new(),
95 setup_handlers: Vec::new(),
96 setup_complete: false,
97 }
98 }
99
100 pub fn register_interval(&mut self, handler: Arc<dyn IntervalHandler>) {
104 tracing::debug!(
105 handler = handler.name(),
106 interval = handler.interval(),
107 "registered interval handler"
108 );
109 self.interval_handlers.push(handler);
110 }
111
112 pub fn register_setup(&mut self, handler: Arc<dyn SetupHandler>) {
116 tracing::debug!(handler = handler.name(), "registered setup handler");
117 self.setup_handlers.push(handler);
118 }
119
120 pub async fn run_setup(&mut self, ctx: &IndexContext) -> Result<(), IndexerError> {
125 if self.setup_complete {
126 tracing::debug!("setup already complete, skipping");
127 return Ok(());
128 }
129
130 for handler in &self.setup_handlers {
131 tracing::info!(handler = handler.name(), "running setup handler");
132 handler
133 .setup(ctx)
134 .await
135 .map_err(|e| IndexerError::Handler {
136 handler: handler.name().to_string(),
137 reason: e.to_string(),
138 })?;
139 }
140
141 self.setup_complete = true;
142 Ok(())
143 }
144
145 pub async fn run_block(
150 &self,
151 block: &BlockSummary,
152 ctx: &IndexContext,
153 ) -> Result<(), IndexerError> {
154 for handler in &self.interval_handlers {
155 if self.should_run_interval(handler.as_ref(), block.number) {
156 tracing::debug!(
157 handler = handler.name(),
158 block = block.number,
159 "running interval handler"
160 );
161 handler
162 .handle(block, ctx)
163 .await
164 .map_err(|e| IndexerError::Handler {
165 handler: handler.name().to_string(),
166 reason: e.to_string(),
167 })?;
168 }
169 }
170 Ok(())
171 }
172
173 pub fn should_run_interval(&self, handler: &dyn IntervalHandler, block_number: u64) -> bool {
178 let interval = handler.interval();
179 if interval == 0 {
180 return false;
181 }
182 block_number.is_multiple_of(interval)
183 }
184
185 pub fn is_setup_complete(&self) -> bool {
187 self.setup_complete
188 }
189
190 pub fn interval_handler_count(&self) -> usize {
192 self.interval_handlers.len()
193 }
194
195 pub fn setup_handler_count(&self) -> usize {
197 self.setup_handlers.len()
198 }
199}
200
201impl Default for BlockHandlerScheduler {
202 fn default() -> Self {
203 Self::new()
204 }
205}
206
207#[cfg(test)]
210mod tests {
211 use super::*;
212 use std::sync::atomic::{AtomicU32, Ordering};
213
214 fn dummy_ctx() -> IndexContext {
216 IndexContext {
217 block: BlockSummary {
218 number: 0,
219 hash: "0x0".into(),
220 parent_hash: "0x0".into(),
221 timestamp: 0,
222 tx_count: 0,
223 },
224 phase: crate::types::IndexPhase::Backfill,
225 chain: "ethereum".into(),
226 }
227 }
228
229 fn block_at(number: u64) -> BlockSummary {
231 BlockSummary {
232 number,
233 hash: format!("0x{:x}", number),
234 parent_hash: format!("0x{:x}", number.saturating_sub(1)),
235 timestamp: number as i64 * 12,
236 tx_count: 0,
237 }
238 }
239
240 struct CountingInterval {
242 count: Arc<AtomicU32>,
243 interval: u64,
244 name: String,
245 }
246
247 impl CountingInterval {
248 fn new(interval: u64, name: &str) -> (Arc<Self>, Arc<AtomicU32>) {
249 let count = Arc::new(AtomicU32::new(0));
250 let handler = Arc::new(Self {
251 count: count.clone(),
252 interval,
253 name: name.to_string(),
254 });
255 (handler, count)
256 }
257 }
258
259 #[async_trait]
260 impl IntervalHandler for CountingInterval {
261 async fn handle(
262 &self,
263 _block: &BlockSummary,
264 _ctx: &IndexContext,
265 ) -> Result<(), IndexerError> {
266 self.count.fetch_add(1, Ordering::Relaxed);
267 Ok(())
268 }
269
270 fn interval(&self) -> u64 {
271 self.interval
272 }
273
274 fn name(&self) -> &str {
275 &self.name
276 }
277 }
278
279 struct CountingSetup {
281 count: Arc<AtomicU32>,
282 name: String,
283 }
284
285 impl CountingSetup {
286 fn new(name: &str) -> (Arc<Self>, Arc<AtomicU32>) {
287 let count = Arc::new(AtomicU32::new(0));
288 let handler = Arc::new(Self {
289 count: count.clone(),
290 name: name.to_string(),
291 });
292 (handler, count)
293 }
294 }
295
296 #[async_trait]
297 impl SetupHandler for CountingSetup {
298 async fn setup(&self, _ctx: &IndexContext) -> Result<(), IndexerError> {
299 self.count.fetch_add(1, Ordering::Relaxed);
300 Ok(())
301 }
302
303 fn name(&self) -> &str {
304 &self.name
305 }
306 }
307
308 struct FailingInterval;
310
311 #[async_trait]
312 impl IntervalHandler for FailingInterval {
313 async fn handle(
314 &self,
315 _block: &BlockSummary,
316 _ctx: &IndexContext,
317 ) -> Result<(), IndexerError> {
318 Err(IndexerError::Other("interval handler failed".into()))
319 }
320
321 fn interval(&self) -> u64 {
322 1
323 }
324
325 fn name(&self) -> &str {
326 "failing"
327 }
328 }
329
330 struct FailingSetup;
332
333 #[async_trait]
334 impl SetupHandler for FailingSetup {
335 async fn setup(&self, _ctx: &IndexContext) -> Result<(), IndexerError> {
336 Err(IndexerError::Other("setup failed".into()))
337 }
338
339 fn name(&self) -> &str {
340 "failing_setup"
341 }
342 }
343
344 #[test]
347 fn register_interval_handler() {
348 let mut scheduler = BlockHandlerScheduler::new();
349 assert_eq!(scheduler.interval_handler_count(), 0);
350
351 let (handler, _) = CountingInterval::new(10, "test");
352 scheduler.register_interval(handler);
353 assert_eq!(scheduler.interval_handler_count(), 1);
354 }
355
356 #[tokio::test]
359 async fn interval_handler_fires_at_correct_interval() {
360 let mut scheduler = BlockHandlerScheduler::new();
361 let (handler, count) = CountingInterval::new(10, "every_10");
362 scheduler.register_interval(handler);
363
364 let ctx = dummy_ctx();
365
366 for i in 0..30 {
368 scheduler.run_block(&block_at(i), &ctx).await.unwrap();
369 }
370
371 assert_eq!(count.load(Ordering::Relaxed), 3);
372 }
373
374 #[tokio::test]
377 async fn setup_runs_once() {
378 let mut scheduler = BlockHandlerScheduler::new();
379 let (handler, count) = CountingSetup::new("init");
380 scheduler.register_setup(handler);
381
382 let ctx = dummy_ctx();
383
384 scheduler.run_setup(&ctx).await.unwrap();
386 scheduler.run_setup(&ctx).await.unwrap();
387
388 assert_eq!(count.load(Ordering::Relaxed), 1);
389 assert!(scheduler.is_setup_complete());
390 }
391
392 #[tokio::test]
395 async fn multiple_interval_handlers_different_intervals() {
396 let mut scheduler = BlockHandlerScheduler::new();
397
398 let (h5, count5) = CountingInterval::new(5, "every_5");
399 let (h7, count7) = CountingInterval::new(7, "every_7");
400 scheduler.register_interval(h5);
401 scheduler.register_interval(h7);
402
403 let ctx = dummy_ctx();
404
405 for i in 0..35 {
409 scheduler.run_block(&block_at(i), &ctx).await.unwrap();
410 }
411
412 assert_eq!(count5.load(Ordering::Relaxed), 7);
413 assert_eq!(count7.load(Ordering::Relaxed), 5);
414 }
415
416 #[tokio::test]
419 async fn block_zero_fires_all_interval_handlers() {
420 let mut scheduler = BlockHandlerScheduler::new();
421
422 let (h100, count100) = CountingInterval::new(100, "every_100");
423 let (h1000, count1000) = CountingInterval::new(1000, "every_1000");
424 scheduler.register_interval(h100);
425 scheduler.register_interval(h1000);
426
427 let ctx = dummy_ctx();
428
429 scheduler.run_block(&block_at(0), &ctx).await.unwrap();
431
432 assert_eq!(count100.load(Ordering::Relaxed), 1);
433 assert_eq!(count1000.load(Ordering::Relaxed), 1);
434 }
435
436 #[tokio::test]
439 async fn interval_handler_error_propagation() {
440 let mut scheduler = BlockHandlerScheduler::new();
441 scheduler.register_interval(Arc::new(FailingInterval));
442
443 let ctx = dummy_ctx();
444 let result = scheduler.run_block(&block_at(0), &ctx).await;
445
446 assert!(result.is_err());
447 let err = result.unwrap_err();
448 match err {
449 IndexerError::Handler { handler, reason } => {
450 assert_eq!(handler, "failing");
451 assert!(reason.contains("interval handler failed"));
452 }
453 _ => panic!("expected Handler error, got {:?}", err),
454 }
455 }
456
457 #[tokio::test]
460 async fn setup_handler_error_propagation() {
461 let mut scheduler = BlockHandlerScheduler::new();
462 scheduler.register_setup(Arc::new(FailingSetup));
463
464 let ctx = dummy_ctx();
465 let result = scheduler.run_setup(&ctx).await;
466
467 assert!(result.is_err());
468 assert!(!scheduler.is_setup_complete());
469 }
470
471 #[tokio::test]
474 async fn zero_interval_never_fires() {
475 let mut scheduler = BlockHandlerScheduler::new();
476 let (handler, count) = CountingInterval::new(0, "never");
477 scheduler.register_interval(handler);
478
479 let ctx = dummy_ctx();
480
481 for i in 0..100 {
482 scheduler.run_block(&block_at(i), &ctx).await.unwrap();
483 }
484
485 assert_eq!(count.load(Ordering::Relaxed), 0);
486 }
487
488 #[test]
491 fn should_run_interval_correctness() {
492 let scheduler = BlockHandlerScheduler::new();
493 let (handler, _) = CountingInterval::new(10, "test");
494
495 assert!(scheduler.should_run_interval(handler.as_ref(), 0));
496 assert!(!scheduler.should_run_interval(handler.as_ref(), 1));
497 assert!(!scheduler.should_run_interval(handler.as_ref(), 9));
498 assert!(scheduler.should_run_interval(handler.as_ref(), 10));
499 assert!(scheduler.should_run_interval(handler.as_ref(), 100));
500 assert!(!scheduler.should_run_interval(handler.as_ref(), 101));
501 }
502
503 #[tokio::test]
506 async fn multiple_setup_handlers_all_run() {
507 let mut scheduler = BlockHandlerScheduler::new();
508
509 let (h1, count1) = CountingSetup::new("setup_a");
510 let (h2, count2) = CountingSetup::new("setup_b");
511 let (h3, count3) = CountingSetup::new("setup_c");
512
513 scheduler.register_setup(h1);
514 scheduler.register_setup(h2);
515 scheduler.register_setup(h3);
516
517 let ctx = dummy_ctx();
518 scheduler.run_setup(&ctx).await.unwrap();
519
520 assert_eq!(count1.load(Ordering::Relaxed), 1);
521 assert_eq!(count2.load(Ordering::Relaxed), 1);
522 assert_eq!(count3.load(Ordering::Relaxed), 1);
523 }
524}