Skip to main content

chainindex_core/
backfill.rs

1//! Parallel backfill engine for the chainindex pipeline.
2//!
3//! Divides a historical block range into fixed-size segments and processes
4//! them concurrently, bounded by a [`tokio::sync::Semaphore`].  Each segment
5//! fetches events in smaller batches via a [`BlockDataProvider`] and retries
6//! on failure with exponential back-off.
7//!
8//! # Example
9//!
10//! ```rust,ignore
11//! let config = BackfillConfig {
12//!     from_block: 0,
13//!     to_block: 1_000_000,
14//!     concurrency: 8,
15//!     ..BackfillConfig::default()
16//! };
17//! let engine = BackfillEngine::new(config, provider, filter, "ethereum".into());
18//! let result = engine.run().await?;
19//! println!("indexed {} events", result.total_events);
20//! ```
21
22use std::sync::Arc;
23use std::time::{Duration, Instant};
24
25use async_trait::async_trait;
26use futures::future::join_all;
27use serde::{Deserialize, Serialize};
28use tokio::sync::{Mutex, Semaphore};
29use tracing::{debug, error, info, warn};
30
31use crate::error::IndexerError;
32use crate::handler::DecodedEvent;
33use crate::types::EventFilter;
34
35// ─── BackfillConfig ───────────────────────────────────────────────────────────
36
37/// Configuration for the parallel backfill engine.
38#[derive(Debug, Clone, Serialize, Deserialize)]
39pub struct BackfillConfig {
40    /// First block to index (inclusive).
41    pub from_block: u64,
42    /// Last block to index (inclusive).
43    pub to_block: u64,
44    /// Maximum number of segments processed in parallel.
45    pub concurrency: usize,
46    /// Number of blocks in each parallel work unit (segment).
47    pub segment_size: u64,
48    /// Number of blocks fetched in a single RPC call within a segment.
49    pub batch_size: u64,
50    /// How many times to retry a failed segment before giving up.
51    pub retry_attempts: u32,
52    /// Base delay between retries; doubles on each attempt (exponential back-off).
53    pub retry_delay: Duration,
54}
55
56impl Default for BackfillConfig {
57    fn default() -> Self {
58        Self {
59            from_block: 0,
60            to_block: 0,
61            concurrency: 4,
62            segment_size: 10_000,
63            batch_size: 500,
64            retry_attempts: 3,
65            retry_delay: Duration::from_secs(1),
66        }
67    }
68}
69
70impl BackfillConfig {
71    /// Validate that the config is internally consistent.
72    pub fn validate(&self) -> Result<(), IndexerError> {
73        if self.from_block > self.to_block {
74            return Err(IndexerError::Other(format!(
75                "backfill config invalid: from_block ({}) > to_block ({})",
76                self.from_block, self.to_block
77            )));
78        }
79        if self.concurrency == 0 {
80            return Err(IndexerError::Other(
81                "backfill config invalid: concurrency must be >= 1".into(),
82            ));
83        }
84        if self.segment_size == 0 {
85            return Err(IndexerError::Other(
86                "backfill config invalid: segment_size must be >= 1".into(),
87            ));
88        }
89        if self.batch_size == 0 {
90            return Err(IndexerError::Other(
91                "backfill config invalid: batch_size must be >= 1".into(),
92            ));
93        }
94        Ok(())
95    }
96
97    /// Divide the configured block range into ordered [`BackfillSegment`]s.
98    pub fn segments(&self) -> Vec<BackfillSegment> {
99        if self.from_block > self.to_block {
100            return vec![];
101        }
102        let mut segments = Vec::new();
103        let mut current = self.from_block;
104        let mut id = 0usize;
105
106        while current <= self.to_block {
107            let end = (current + self.segment_size - 1).min(self.to_block);
108            segments.push(BackfillSegment {
109                id,
110                from_block: current,
111                to_block: end,
112                status: SegmentStatus::Pending,
113                events_processed: 0,
114                duration: None,
115                error: None,
116            });
117            id += 1;
118            // Avoid overflow when to_block == u64::MAX
119            match end.checked_add(1) {
120                Some(next) => current = next,
121                None => break,
122            }
123        }
124        segments
125    }
126}
127
128// ─── SegmentStatus ────────────────────────────────────────────────────────────
129
130/// Processing state of a single backfill segment.
131#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
132pub enum SegmentStatus {
133    /// Not yet started.
134    Pending,
135    /// Currently being processed by a worker.
136    InProgress,
137    /// All batches within the segment completed successfully.
138    Complete,
139    /// All retry attempts exhausted; segment could not be processed.
140    Failed,
141}
142
143// ─── BackfillSegment ──────────────────────────────────────────────────────────
144
145/// A single work unit covering a contiguous block sub-range.
146#[derive(Debug, Clone, Serialize, Deserialize)]
147pub struct BackfillSegment {
148    /// Zero-based index within the ordered segment list.
149    pub id: usize,
150    /// First block (inclusive).
151    pub from_block: u64,
152    /// Last block (inclusive).
153    pub to_block: u64,
154    /// Current processing status.
155    pub status: SegmentStatus,
156    /// Number of events collected from this segment.
157    pub events_processed: u64,
158    /// Wall-clock time taken to process the segment (set on completion).
159    pub duration: Option<Duration>,
160    /// Human-readable error message if the segment failed permanently.
161    pub error: Option<String>,
162}
163
164impl BackfillSegment {
165    /// Total blocks covered by this segment.
166    pub fn block_count(&self) -> u64 {
167        self.to_block - self.from_block + 1
168    }
169}
170
171// ─── BackfillProgress ─────────────────────────────────────────────────────────
172
173/// A point-in-time snapshot of backfill progress.
174#[derive(Debug, Clone, Serialize, Deserialize)]
175pub struct BackfillProgress {
176    /// Total number of segments in the job.
177    pub total_segments: usize,
178    /// Segments that have finished successfully.
179    pub completed_segments: usize,
180    /// Segments that exhausted all retries.
181    pub failed_segments: usize,
182    /// Total blocks in the range (including any failed segments).
183    pub total_blocks: u64,
184    /// Blocks whose events have been successfully collected.
185    pub processed_blocks: u64,
186    /// Total events seen so far.
187    pub total_events: u64,
188    /// Elapsed time since the backfill started.
189    pub elapsed: Duration,
190}
191
192impl BackfillProgress {
193    /// Throughput in blocks per second over the elapsed window.
194    ///
195    /// Returns `0.0` if elapsed is zero.
196    pub fn blocks_per_second(&self) -> f64 {
197        let secs = self.elapsed.as_secs_f64();
198        if secs == 0.0 {
199            return 0.0;
200        }
201        self.processed_blocks as f64 / secs
202    }
203
204    /// Estimated time remaining based on current throughput.
205    ///
206    /// Returns [`Duration::ZERO`] if already complete or throughput is zero.
207    pub fn eta(&self) -> Duration {
208        let remaining = self.total_blocks.saturating_sub(self.processed_blocks);
209        if remaining == 0 {
210            return Duration::ZERO;
211        }
212        let bps = self.blocks_per_second();
213        if bps == 0.0 {
214            return Duration::MAX;
215        }
216        Duration::from_secs_f64(remaining as f64 / bps)
217    }
218
219    /// Fraction of blocks processed, expressed as a percentage in `[0.0, 100.0]`.
220    pub fn percent_complete(&self) -> f64 {
221        if self.total_blocks == 0 {
222            return 100.0;
223        }
224        (self.processed_blocks as f64 / self.total_blocks as f64) * 100.0
225    }
226}
227
228// ─── BackfillResult ───────────────────────────────────────────────────────────
229
230/// Final summary produced by [`BackfillEngine::run`].
231#[derive(Debug, Clone, Serialize, Deserialize)]
232pub struct BackfillResult {
233    /// All segments, in order, with their final statuses.
234    pub segments: Vec<BackfillSegment>,
235    /// Total events collected across all successful segments.
236    pub total_events: u64,
237    /// Wall-clock time for the entire backfill job.
238    pub total_duration: Duration,
239    /// IDs of segments that permanently failed (exhausted retries).
240    pub failed_segments: Vec<usize>,
241}
242
243// ─── BlockDataProvider ────────────────────────────────────────────────────────
244
245/// Abstraction over an RPC provider for backfill purposes.
246///
247/// Implementations should wrap an actual JSON-RPC client (e.g. via
248/// `chainrpc`) and translate network errors into [`IndexerError::Rpc`].
249#[async_trait]
250pub trait BlockDataProvider: Send + Sync {
251    /// Fetch decoded events within `[from, to]` matching `filter`.
252    async fn get_events(
253        &self,
254        from: u64,
255        to: u64,
256        filter: &EventFilter,
257    ) -> Result<Vec<DecodedEvent>, IndexerError>;
258
259    /// Fetch a single block summary by number.  Returns `None` if the block
260    /// does not exist on the node (e.g. beyond the chain head).
261    async fn get_block(
262        &self,
263        number: u64,
264    ) -> Result<Option<crate::types::BlockSummary>, IndexerError>;
265}
266
267// ─── Internal shared state ────────────────────────────────────────────────────
268
269/// Mutable state shared between worker tasks via `Arc<Mutex<>>`.
270struct EngineState {
271    segments: Vec<BackfillSegment>,
272    /// Per-segment event buffers; index matches segment id.
273    events: Vec<Vec<DecodedEvent>>,
274    processed_blocks: u64,
275    total_events: u64,
276    start_time: Instant,
277}
278
279impl EngineState {
280    fn new(segments: Vec<BackfillSegment>) -> Self {
281        let n = segments.len();
282        Self {
283            segments,
284            events: vec![vec![]; n],
285            processed_blocks: 0,
286            total_events: 0,
287            start_time: Instant::now(),
288        }
289    }
290
291    fn progress(&self, total_blocks: u64) -> BackfillProgress {
292        let completed = self
293            .segments
294            .iter()
295            .filter(|s| s.status == SegmentStatus::Complete)
296            .count();
297        let failed = self
298            .segments
299            .iter()
300            .filter(|s| s.status == SegmentStatus::Failed)
301            .count();
302
303        BackfillProgress {
304            total_segments: self.segments.len(),
305            completed_segments: completed,
306            failed_segments: failed,
307            total_blocks,
308            processed_blocks: self.processed_blocks,
309            total_events: self.total_events,
310            elapsed: self.start_time.elapsed(),
311        }
312    }
313}
314
315// ─── BackfillEngine ───────────────────────────────────────────────────────────
316
317/// Parallel backfill engine.
318///
319/// Construct with [`BackfillEngine::new`] and execute with [`BackfillEngine::run`].
320pub struct BackfillEngine {
321    config: BackfillConfig,
322    provider: Arc<dyn BlockDataProvider>,
323    filter: EventFilter,
324    chain: String,
325    state: Arc<Mutex<EngineState>>,
326    total_blocks: u64,
327}
328
329impl BackfillEngine {
330    /// Create a new engine.  The `provider` is shared across all worker tasks.
331    pub fn new(
332        config: BackfillConfig,
333        provider: Arc<dyn BlockDataProvider>,
334        filter: EventFilter,
335        chain: impl Into<String>,
336    ) -> Self {
337        let segments = config.segments();
338        let total_blocks = if config.from_block <= config.to_block {
339            config.to_block - config.from_block + 1
340        } else {
341            0
342        };
343        let state = Arc::new(Mutex::new(EngineState::new(segments)));
344        Self {
345            config,
346            provider,
347            filter,
348            chain: chain.into(),
349            state,
350            total_blocks,
351        }
352    }
353
354    /// Return a snapshot of the current progress.
355    ///
356    /// Safe to call from any task while [`run`] is executing.
357    pub async fn progress(&self) -> BackfillProgress {
358        self.state.lock().await.progress(self.total_blocks)
359    }
360
361    /// Execute the backfill.
362    ///
363    /// Spawns up to `config.concurrency` concurrent worker tasks, each
364    /// processing one segment at a time.  Blocks until all segments have
365    /// either completed or exhausted their retry budget.
366    pub async fn run(&self) -> Result<BackfillResult, IndexerError> {
367        self.config.validate()?;
368
369        let segment_count = {
370            let guard = self.state.lock().await;
371            guard.segments.len()
372        };
373
374        if segment_count == 0 {
375            info!(chain = %self.chain, "backfill: empty range, nothing to do");
376            return Ok(BackfillResult {
377                segments: vec![],
378                total_events: 0,
379                total_duration: Duration::ZERO,
380                failed_segments: vec![],
381            });
382        }
383
384        info!(
385            chain = %self.chain,
386            from = self.config.from_block,
387            to = self.config.to_block,
388            segments = segment_count,
389            concurrency = self.config.concurrency,
390            "backfill: starting"
391        );
392
393        let start = Instant::now();
394        let semaphore = Arc::new(Semaphore::new(self.config.concurrency));
395
396        // Spawn one task per segment; the semaphore bounds parallelism.
397        let mut handles = Vec::with_capacity(segment_count);
398
399        for seg_id in 0..segment_count {
400            let sem = semaphore.clone();
401            let state = self.state.clone();
402            let provider = self.provider.clone();
403            let filter = self.filter.clone();
404            let config = self.config.clone();
405            let chain = self.chain.clone();
406
407            let handle = tokio::spawn(async move {
408                let _permit = sem.acquire().await.expect("semaphore closed");
409
410                // Mark segment as in-progress.
411                {
412                    let mut guard = state.lock().await;
413                    guard.segments[seg_id].status = SegmentStatus::InProgress;
414                }
415
416                let (from_block, to_block) = {
417                    let guard = state.lock().await;
418                    (
419                        guard.segments[seg_id].from_block,
420                        guard.segments[seg_id].to_block,
421                    )
422                };
423
424                let seg_start = Instant::now();
425                let mut last_error: Option<String> = None;
426                let mut succeeded = false;
427                let mut collected: Vec<DecodedEvent> = vec![];
428
429                for attempt in 0..=config.retry_attempts {
430                    if attempt > 0 {
431                        // Exponential back-off: base * 2^(attempt-1)
432                        let backoff = config.retry_delay * 2u32.pow(attempt - 1);
433                        warn!(
434                            chain = %chain,
435                            seg = seg_id,
436                            attempt,
437                            backoff_ms = backoff.as_millis(),
438                            "backfill: retrying segment"
439                        );
440                        tokio::time::sleep(backoff).await;
441                    }
442
443                    match process_segment(
444                        seg_id,
445                        from_block,
446                        to_block,
447                        &config,
448                        provider.as_ref(),
449                        &filter,
450                        &chain,
451                    )
452                    .await
453                    {
454                        Ok(events) => {
455                            collected = events;
456                            succeeded = true;
457                            break;
458                        }
459                        Err(e) => {
460                            let msg = e.to_string();
461                            error!(
462                                chain = %chain,
463                                seg = seg_id,
464                                attempt,
465                                error = %msg,
466                                "backfill: segment attempt failed"
467                            );
468                            last_error = Some(msg);
469                        }
470                    }
471                }
472
473                let elapsed = seg_start.elapsed();
474                let event_count = collected.len() as u64;
475                let block_count = to_block - from_block + 1;
476
477                // Update shared state.
478                {
479                    let mut guard = state.lock().await;
480                    if succeeded {
481                        guard.segments[seg_id].status = SegmentStatus::Complete;
482                        guard.segments[seg_id].events_processed = event_count;
483                        guard.segments[seg_id].duration = Some(elapsed);
484                        guard.events[seg_id] = collected;
485                        guard.processed_blocks += block_count;
486                        guard.total_events += event_count;
487                        debug!(
488                            chain = %chain,
489                            seg = seg_id,
490                            events = event_count,
491                            blocks = block_count,
492                            elapsed_ms = elapsed.as_millis(),
493                            "backfill: segment complete"
494                        );
495                    } else {
496                        guard.segments[seg_id].status = SegmentStatus::Failed;
497                        guard.segments[seg_id].error = last_error;
498                        guard.segments[seg_id].duration = Some(elapsed);
499                    }
500                }
501            });
502
503            handles.push(handle);
504        }
505
506        join_all(handles).await;
507
508        let total_duration = start.elapsed();
509        let guard = self.state.lock().await;
510
511        let total_events = guard.total_events;
512        let failed_segments: Vec<usize> = guard
513            .segments
514            .iter()
515            .filter(|s| s.status == SegmentStatus::Failed)
516            .map(|s| s.id)
517            .collect();
518
519        info!(
520            chain = %self.chain,
521            total_events,
522            failed = failed_segments.len(),
523            elapsed_ms = total_duration.as_millis(),
524            "backfill: complete"
525        );
526
527        Ok(BackfillResult {
528            segments: guard.segments.clone(),
529            total_events,
530            total_duration,
531            failed_segments,
532        })
533    }
534}
535
536/// Process a single segment: fetch events in `batch_size` sub-ranges and
537/// aggregate the results.
538async fn process_segment(
539    seg_id: usize,
540    from_block: u64,
541    to_block: u64,
542    config: &BackfillConfig,
543    provider: &dyn BlockDataProvider,
544    filter: &EventFilter,
545    chain: &str,
546) -> Result<Vec<DecodedEvent>, IndexerError> {
547    let mut all_events = Vec::new();
548    let mut batch_from = from_block;
549
550    while batch_from <= to_block {
551        let batch_to = (batch_from + config.batch_size - 1).min(to_block);
552
553        debug!(
554            chain = %chain,
555            seg = seg_id,
556            batch_from,
557            batch_to,
558            "backfill: fetching batch"
559        );
560
561        let events = provider.get_events(batch_from, batch_to, filter).await?;
562        all_events.extend(events);
563
564        match batch_to.checked_add(1) {
565            Some(next) => batch_from = next,
566            None => break,
567        }
568    }
569
570    Ok(all_events)
571}
572
573// ─── SegmentMerger ────────────────────────────────────────────────────────────
574
575/// Merges results from multiple completed segments into a single, ordered
576/// event stream.
577///
578/// Events are ordered by `(block_number, log_index)` to ensure a stable,
579/// deterministic output regardless of how tasks were scheduled.
580pub struct SegmentMerger;
581
582impl SegmentMerger {
583    /// Merge the per-segment event buffers into one ordered `Vec<DecodedEvent>`.
584    ///
585    /// Only segments with [`SegmentStatus::Complete`] contribute events.
586    /// `segments` and `events` must be parallel slices of the same length.
587    pub fn merge(segments: &[BackfillSegment], events: &[Vec<DecodedEvent>]) -> Vec<DecodedEvent> {
588        assert_eq!(
589            segments.len(),
590            events.len(),
591            "segments and events slices must have equal length"
592        );
593
594        // Collect events from successful segments only, in segment order.
595        let mut merged: Vec<DecodedEvent> = segments
596            .iter()
597            .zip(events.iter())
598            .filter(|(seg, _)| seg.status == SegmentStatus::Complete)
599            .flat_map(|(_, evts)| evts.iter().cloned())
600            .collect();
601
602        // Final sort guarantees strict block/log ordering regardless of
603        // any within-segment ordering that the provider may return.
604        merged.sort_by_key(|e| (e.block_number, e.log_index));
605        merged
606    }
607}
608
609// ─── Tests ────────────────────────────────────────────────────────────────────
610
611#[cfg(test)]
612mod tests {
613    use super::*;
614    use std::sync::atomic::{AtomicU32, Ordering};
615    use tokio::time::Duration;
616
617    // ── Helpers ───────────────────────────────────────────────────────────────
618
619    fn make_event(block_number: u64, log_index: u32) -> DecodedEvent {
620        DecodedEvent {
621            chain: "ethereum".into(),
622            schema: "TestEvent".into(),
623            address: "0xdeadbeef".into(),
624            tx_hash: format!("0x{block_number:064x}"),
625            block_number,
626            log_index,
627            fields_json: serde_json::Value::Null,
628        }
629    }
630
631    fn make_segment(id: usize, from: u64, to: u64, status: SegmentStatus) -> BackfillSegment {
632        BackfillSegment {
633            id,
634            from_block: from,
635            to_block: to,
636            status,
637            events_processed: 0,
638            duration: None,
639            error: None,
640        }
641    }
642
643    // ── MockProvider ──────────────────────────────────────────────────────────
644
645    /// A mock provider that returns one event per block in the requested range.
646    struct MockProvider {
647        /// Number of times `get_events` was called.
648        call_count: Arc<AtomicU32>,
649        /// If set, the provider fails this many times before succeeding.
650        fail_times: Arc<AtomicU32>,
651    }
652
653    impl MockProvider {
654        fn new() -> Self {
655            Self {
656                call_count: Arc::new(AtomicU32::new(0)),
657                fail_times: Arc::new(AtomicU32::new(0)),
658            }
659        }
660
661        fn with_failures(n: u32) -> Self {
662            let p = Self::new();
663            p.fail_times.store(n, Ordering::SeqCst);
664            p
665        }
666    }
667
668    #[async_trait]
669    impl BlockDataProvider for MockProvider {
670        async fn get_events(
671            &self,
672            from: u64,
673            to: u64,
674            _filter: &EventFilter,
675        ) -> Result<Vec<DecodedEvent>, IndexerError> {
676            self.call_count.fetch_add(1, Ordering::SeqCst);
677
678            // Fail if we still have remaining failures to burn.
679            let remaining = self.fail_times.load(Ordering::SeqCst);
680            if remaining > 0 {
681                self.fail_times.store(remaining - 1, Ordering::SeqCst);
682                return Err(IndexerError::Rpc("mock RPC error".into()));
683            }
684
685            // One event per block.
686            let events = (from..=to).map(|b| make_event(b, 0)).collect();
687            Ok(events)
688        }
689
690        async fn get_block(
691            &self,
692            number: u64,
693        ) -> Result<Option<crate::types::BlockSummary>, IndexerError> {
694            Ok(Some(crate::types::BlockSummary {
695                number,
696                hash: format!("0x{number:064x}"),
697                parent_hash: format!("0x{:064x}", number.saturating_sub(1)),
698                timestamp: number as i64 * 12,
699                tx_count: 0,
700            }))
701        }
702    }
703
704    // ── Segment calculation tests ─────────────────────────────────────────────
705
706    #[test]
707    fn segments_exact_multiple() {
708        let cfg = BackfillConfig {
709            from_block: 0,
710            to_block: 29_999,
711            segment_size: 10_000,
712            ..BackfillConfig::default()
713        };
714        let segs = cfg.segments();
715        assert_eq!(segs.len(), 3);
716        assert_eq!(segs[0].from_block, 0);
717        assert_eq!(segs[0].to_block, 9_999);
718        assert_eq!(segs[1].from_block, 10_000);
719        assert_eq!(segs[1].to_block, 19_999);
720        assert_eq!(segs[2].from_block, 20_000);
721        assert_eq!(segs[2].to_block, 29_999);
722    }
723
724    #[test]
725    fn segments_non_multiple_range() {
726        let cfg = BackfillConfig {
727            from_block: 100,
728            to_block: 10_250,
729            segment_size: 10_000,
730            ..BackfillConfig::default()
731        };
732        let segs = cfg.segments();
733        assert_eq!(segs.len(), 2);
734        assert_eq!(segs[0].from_block, 100);
735        assert_eq!(segs[0].to_block, 10_099); // 100 + 10_000 - 1
736        assert_eq!(segs[1].from_block, 10_100);
737        assert_eq!(segs[1].to_block, 10_250);
738    }
739
740    #[test]
741    fn segments_single_block() {
742        let cfg = BackfillConfig {
743            from_block: 42,
744            to_block: 42,
745            segment_size: 10_000,
746            ..BackfillConfig::default()
747        };
748        let segs = cfg.segments();
749        assert_eq!(segs.len(), 1);
750        assert_eq!(segs[0].from_block, 42);
751        assert_eq!(segs[0].to_block, 42);
752        assert_eq!(segs[0].block_count(), 1);
753    }
754
755    #[test]
756    fn segments_empty_when_from_gt_to() {
757        let cfg = BackfillConfig {
758            from_block: 100,
759            to_block: 50, // invalid but should not panic
760            segment_size: 10_000,
761            ..BackfillConfig::default()
762        };
763        assert!(cfg.segments().is_empty());
764    }
765
766    #[test]
767    fn segment_ids_are_sequential() {
768        let cfg = BackfillConfig {
769            from_block: 0,
770            to_block: 49_999,
771            segment_size: 10_000,
772            ..BackfillConfig::default()
773        };
774        for (i, seg) in cfg.segments().iter().enumerate() {
775            assert_eq!(seg.id, i);
776        }
777    }
778
779    // ── Config validation tests ───────────────────────────────────────────────
780
781    #[test]
782    fn config_validate_ok() {
783        let cfg = BackfillConfig {
784            from_block: 0,
785            to_block: 1_000,
786            ..BackfillConfig::default()
787        };
788        assert!(cfg.validate().is_ok());
789    }
790
791    #[test]
792    fn config_validate_from_gt_to() {
793        let cfg = BackfillConfig {
794            from_block: 500,
795            to_block: 100,
796            ..BackfillConfig::default()
797        };
798        assert!(cfg.validate().is_err());
799    }
800
801    #[test]
802    fn config_validate_zero_concurrency() {
803        let cfg = BackfillConfig {
804            from_block: 0,
805            to_block: 100,
806            concurrency: 0,
807            ..BackfillConfig::default()
808        };
809        assert!(cfg.validate().is_err());
810    }
811
812    #[test]
813    fn config_validate_zero_segment_size() {
814        let cfg = BackfillConfig {
815            from_block: 0,
816            to_block: 100,
817            segment_size: 0,
818            ..BackfillConfig::default()
819        };
820        assert!(cfg.validate().is_err());
821    }
822
823    #[test]
824    fn config_validate_zero_batch_size() {
825        let cfg = BackfillConfig {
826            from_block: 0,
827            to_block: 100,
828            batch_size: 0,
829            ..BackfillConfig::default()
830        };
831        assert!(cfg.validate().is_err());
832    }
833
834    // ── Progress tracking tests ───────────────────────────────────────────────
835
836    #[test]
837    fn progress_percent_zero_at_start() {
838        let p = BackfillProgress {
839            total_segments: 10,
840            completed_segments: 0,
841            failed_segments: 0,
842            total_blocks: 100_000,
843            processed_blocks: 0,
844            total_events: 0,
845            elapsed: Duration::from_secs(0),
846        };
847        assert_eq!(p.percent_complete(), 0.0);
848    }
849
850    #[test]
851    fn progress_percent_complete_100() {
852        let p = BackfillProgress {
853            total_segments: 10,
854            completed_segments: 10,
855            failed_segments: 0,
856            total_blocks: 100_000,
857            processed_blocks: 100_000,
858            total_events: 42,
859            elapsed: Duration::from_secs(10),
860        };
861        assert!((p.percent_complete() - 100.0).abs() < f64::EPSILON);
862    }
863
864    #[test]
865    fn progress_blocks_per_second() {
866        let p = BackfillProgress {
867            total_segments: 1,
868            completed_segments: 1,
869            failed_segments: 0,
870            total_blocks: 1000,
871            processed_blocks: 500,
872            total_events: 0,
873            elapsed: Duration::from_secs(5),
874        };
875        // 500 blocks / 5 s = 100 bps
876        assert!((p.blocks_per_second() - 100.0).abs() < 1e-9);
877    }
878
879    #[test]
880    fn progress_blocks_per_second_zero_elapsed() {
881        let p = BackfillProgress {
882            total_segments: 1,
883            completed_segments: 0,
884            failed_segments: 0,
885            total_blocks: 1000,
886            processed_blocks: 0,
887            total_events: 0,
888            elapsed: Duration::from_secs(0),
889        };
890        assert_eq!(p.blocks_per_second(), 0.0);
891    }
892
893    #[test]
894    fn progress_eta_zero_when_done() {
895        let p = BackfillProgress {
896            total_segments: 1,
897            completed_segments: 1,
898            failed_segments: 0,
899            total_blocks: 1000,
900            processed_blocks: 1000,
901            total_events: 0,
902            elapsed: Duration::from_secs(10),
903        };
904        assert_eq!(p.eta(), Duration::ZERO);
905    }
906
907    #[test]
908    fn progress_eta_reasonable() {
909        let p = BackfillProgress {
910            total_segments: 2,
911            completed_segments: 1,
912            failed_segments: 0,
913            total_blocks: 1000,
914            processed_blocks: 500,
915            total_events: 0,
916            elapsed: Duration::from_secs(5),
917        };
918        // 500 remaining / 100 bps = 5 s
919        let eta = p.eta();
920        assert!(eta.as_secs_f64() > 4.9 && eta.as_secs_f64() < 5.1);
921    }
922
923    #[test]
924    fn progress_percent_empty_range() {
925        let p = BackfillProgress {
926            total_segments: 0,
927            completed_segments: 0,
928            failed_segments: 0,
929            total_blocks: 0,
930            processed_blocks: 0,
931            total_events: 0,
932            elapsed: Duration::ZERO,
933        };
934        // Defined as 100% when there is nothing to process.
935        assert_eq!(p.percent_complete(), 100.0);
936    }
937
938    // ── Engine integration tests ──────────────────────────────────────────────
939
940    #[tokio::test]
941    async fn engine_empty_range() {
942        let cfg = BackfillConfig {
943            from_block: 100,
944            to_block: 50, // intentionally inverted
945            ..BackfillConfig::default()
946        };
947        let provider = Arc::new(MockProvider::new());
948        let engine = BackfillEngine::new(cfg, provider, EventFilter::default(), "ethereum");
949        // Validation should surface the error.
950        let result = engine.run().await;
951        assert!(result.is_err());
952    }
953
954    #[tokio::test]
955    async fn engine_single_block_range() {
956        let cfg = BackfillConfig {
957            from_block: 42,
958            to_block: 42,
959            segment_size: 10_000,
960            batch_size: 500,
961            concurrency: 2,
962            retry_attempts: 0,
963            retry_delay: Duration::from_millis(10),
964        };
965        let provider = Arc::new(MockProvider::new());
966        let engine = BackfillEngine::new(cfg, provider, EventFilter::default(), "ethereum");
967        let result = engine.run().await.unwrap();
968
969        assert_eq!(result.segments.len(), 1);
970        assert_eq!(result.total_events, 1); // one event for block 42
971        assert!(result.failed_segments.is_empty());
972    }
973
974    #[tokio::test]
975    async fn engine_successful_backfill_counts_events() {
976        // Range: 0..=99 → 10 segments of 10 blocks each
977        let cfg = BackfillConfig {
978            from_block: 0,
979            to_block: 99,
980            segment_size: 10,
981            batch_size: 5,
982            concurrency: 4,
983            retry_attempts: 0,
984            retry_delay: Duration::from_millis(10),
985        };
986        let provider = Arc::new(MockProvider::new());
987        let engine = BackfillEngine::new(cfg, provider, EventFilter::default(), "ethereum");
988        let result = engine.run().await.unwrap();
989
990        assert_eq!(result.segments.len(), 10);
991        // Mock returns one event per block.
992        assert_eq!(result.total_events, 100);
993        assert!(result.failed_segments.is_empty());
994        assert!(result
995            .segments
996            .iter()
997            .all(|s| s.status == SegmentStatus::Complete));
998    }
999
1000    #[tokio::test]
1001    async fn engine_concurrency_bounded_by_semaphore() {
1002        // Larger range with explicit concurrency cap.
1003        let cfg = BackfillConfig {
1004            from_block: 0,
1005            to_block: 499,
1006            segment_size: 50,
1007            batch_size: 25,
1008            concurrency: 3,
1009            retry_attempts: 0,
1010            retry_delay: Duration::from_millis(1),
1011        };
1012        let provider = Arc::new(MockProvider::new());
1013        let engine = BackfillEngine::new(cfg, provider.clone(), EventFilter::default(), "ethereum");
1014        let result = engine.run().await.unwrap();
1015
1016        // 500 blocks → 10 segments; all should complete.
1017        assert_eq!(result.segments.len(), 10);
1018        assert_eq!(result.total_events, 500);
1019        assert!(result.failed_segments.is_empty());
1020    }
1021
1022    #[tokio::test]
1023    async fn engine_retry_succeeds_after_failures() {
1024        // Provider fails once, then succeeds — retry_attempts=3 should recover.
1025        let cfg = BackfillConfig {
1026            from_block: 0,
1027            to_block: 9,
1028            segment_size: 10,
1029            batch_size: 10,
1030            concurrency: 1,
1031            retry_attempts: 3,
1032            retry_delay: Duration::from_millis(1),
1033        };
1034        // Fail the first call; subsequent calls succeed.
1035        let provider = Arc::new(MockProvider::with_failures(1));
1036        let engine = BackfillEngine::new(cfg, provider, EventFilter::default(), "ethereum");
1037        let result = engine.run().await.unwrap();
1038
1039        assert_eq!(result.segments.len(), 1);
1040        assert_eq!(result.total_events, 10);
1041        assert!(result.failed_segments.is_empty());
1042        assert_eq!(result.segments[0].status, SegmentStatus::Complete);
1043    }
1044
1045    #[tokio::test]
1046    async fn engine_segment_fails_after_all_retries() {
1047        // Provider always fails — segment should end up as Failed.
1048        let cfg = BackfillConfig {
1049            from_block: 0,
1050            to_block: 9,
1051            segment_size: 10,
1052            batch_size: 10,
1053            concurrency: 1,
1054            retry_attempts: 2,
1055            retry_delay: Duration::from_millis(1),
1056        };
1057        // Fail more times than retry_attempts allows.
1058        let provider = Arc::new(MockProvider::with_failures(10));
1059        let engine = BackfillEngine::new(cfg, provider, EventFilter::default(), "ethereum");
1060        let result = engine.run().await.unwrap();
1061
1062        assert_eq!(result.segments.len(), 1);
1063        assert_eq!(result.total_events, 0);
1064        assert_eq!(result.failed_segments, vec![0]);
1065        assert_eq!(result.segments[0].status, SegmentStatus::Failed);
1066    }
1067
1068    #[tokio::test]
1069    async fn engine_large_range_many_segments() {
1070        let cfg = BackfillConfig {
1071            from_block: 0,
1072            to_block: 9_999,
1073            segment_size: 1_000,
1074            batch_size: 200,
1075            concurrency: 5,
1076            retry_attempts: 0,
1077            retry_delay: Duration::from_millis(1),
1078        };
1079        let provider = Arc::new(MockProvider::new());
1080        let engine = BackfillEngine::new(cfg, provider, EventFilter::default(), "ethereum");
1081        let result = engine.run().await.unwrap();
1082
1083        assert_eq!(result.segments.len(), 10);
1084        assert_eq!(result.total_events, 10_000);
1085        assert!(result.failed_segments.is_empty());
1086    }
1087
1088    #[tokio::test]
1089    async fn engine_progress_reflects_completed_segments() {
1090        let cfg = BackfillConfig {
1091            from_block: 0,
1092            to_block: 19,
1093            segment_size: 10,
1094            batch_size: 10,
1095            concurrency: 1,
1096            retry_attempts: 0,
1097            retry_delay: Duration::from_millis(1),
1098        };
1099        let provider = Arc::new(MockProvider::new());
1100        let engine = BackfillEngine::new(cfg, provider, EventFilter::default(), "ethereum");
1101
1102        // Snapshot before run (nothing processed yet).
1103        let pre = engine.progress().await;
1104        assert_eq!(pre.completed_segments, 0);
1105        assert_eq!(pre.processed_blocks, 0);
1106
1107        engine.run().await.unwrap();
1108
1109        let post = engine.progress().await;
1110        assert_eq!(post.completed_segments, 2);
1111        assert_eq!(post.processed_blocks, 20);
1112        assert!((post.percent_complete() - 100.0).abs() < f64::EPSILON);
1113    }
1114
1115    // ── SegmentMerger tests ───────────────────────────────────────────────────
1116
1117    #[test]
1118    fn merger_preserves_block_order() {
1119        let segs = vec![
1120            make_segment(0, 0, 9, SegmentStatus::Complete),
1121            make_segment(1, 10, 19, SegmentStatus::Complete),
1122        ];
1123        // Provide segment 1's events first to verify sorting.
1124        let events = vec![
1125            vec![make_event(5, 0), make_event(3, 0)],
1126            vec![make_event(15, 0), make_event(10, 0)],
1127        ];
1128
1129        let merged = SegmentMerger::merge(&segs, &events);
1130        assert_eq!(merged.len(), 4);
1131        assert_eq!(merged[0].block_number, 3);
1132        assert_eq!(merged[1].block_number, 5);
1133        assert_eq!(merged[2].block_number, 10);
1134        assert_eq!(merged[3].block_number, 15);
1135    }
1136
1137    #[test]
1138    fn merger_skips_failed_segments() {
1139        let segs = vec![
1140            make_segment(0, 0, 9, SegmentStatus::Complete),
1141            make_segment(1, 10, 19, SegmentStatus::Failed),
1142            make_segment(2, 20, 29, SegmentStatus::Complete),
1143        ];
1144        let events = vec![
1145            vec![make_event(1, 0)],
1146            vec![make_event(15, 0)], // should be excluded
1147            vec![make_event(21, 0)],
1148        ];
1149
1150        let merged = SegmentMerger::merge(&segs, &events);
1151        assert_eq!(merged.len(), 2);
1152        assert!(merged.iter().all(|e| e.block_number != 15));
1153    }
1154
1155    #[test]
1156    fn merger_tiebreaks_by_log_index() {
1157        let segs = vec![make_segment(0, 100, 100, SegmentStatus::Complete)];
1158        let events = vec![vec![
1159            make_event(100, 3),
1160            make_event(100, 1),
1161            make_event(100, 2),
1162        ]];
1163
1164        let merged = SegmentMerger::merge(&segs, &events);
1165        assert_eq!(merged[0].log_index, 1);
1166        assert_eq!(merged[1].log_index, 2);
1167        assert_eq!(merged[2].log_index, 3);
1168    }
1169
1170    #[test]
1171    fn merger_empty_input() {
1172        let merged = SegmentMerger::merge(&[], &[]);
1173        assert!(merged.is_empty());
1174    }
1175}