1use std::sync::Arc;
23use std::time::{Duration, Instant};
24
25use async_trait::async_trait;
26use futures::future::join_all;
27use serde::{Deserialize, Serialize};
28use tokio::sync::{Mutex, Semaphore};
29use tracing::{debug, error, info, warn};
30
31use crate::error::IndexerError;
32use crate::handler::DecodedEvent;
33use crate::types::EventFilter;
34
35#[derive(Debug, Clone, Serialize, Deserialize)]
39pub struct BackfillConfig {
40 pub from_block: u64,
42 pub to_block: u64,
44 pub concurrency: usize,
46 pub segment_size: u64,
48 pub batch_size: u64,
50 pub retry_attempts: u32,
52 pub retry_delay: Duration,
54}
55
56impl Default for BackfillConfig {
57 fn default() -> Self {
58 Self {
59 from_block: 0,
60 to_block: 0,
61 concurrency: 4,
62 segment_size: 10_000,
63 batch_size: 500,
64 retry_attempts: 3,
65 retry_delay: Duration::from_secs(1),
66 }
67 }
68}
69
70impl BackfillConfig {
71 pub fn validate(&self) -> Result<(), IndexerError> {
73 if self.from_block > self.to_block {
74 return Err(IndexerError::Other(format!(
75 "backfill config invalid: from_block ({}) > to_block ({})",
76 self.from_block, self.to_block
77 )));
78 }
79 if self.concurrency == 0 {
80 return Err(IndexerError::Other(
81 "backfill config invalid: concurrency must be >= 1".into(),
82 ));
83 }
84 if self.segment_size == 0 {
85 return Err(IndexerError::Other(
86 "backfill config invalid: segment_size must be >= 1".into(),
87 ));
88 }
89 if self.batch_size == 0 {
90 return Err(IndexerError::Other(
91 "backfill config invalid: batch_size must be >= 1".into(),
92 ));
93 }
94 Ok(())
95 }
96
97 pub fn segments(&self) -> Vec<BackfillSegment> {
99 if self.from_block > self.to_block {
100 return vec![];
101 }
102 let mut segments = Vec::new();
103 let mut current = self.from_block;
104 let mut id = 0usize;
105
106 while current <= self.to_block {
107 let end = (current + self.segment_size - 1).min(self.to_block);
108 segments.push(BackfillSegment {
109 id,
110 from_block: current,
111 to_block: end,
112 status: SegmentStatus::Pending,
113 events_processed: 0,
114 duration: None,
115 error: None,
116 });
117 id += 1;
118 match end.checked_add(1) {
120 Some(next) => current = next,
121 None => break,
122 }
123 }
124 segments
125 }
126}
127
128#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
132pub enum SegmentStatus {
133 Pending,
135 InProgress,
137 Complete,
139 Failed,
141}
142
143#[derive(Debug, Clone, Serialize, Deserialize)]
147pub struct BackfillSegment {
148 pub id: usize,
150 pub from_block: u64,
152 pub to_block: u64,
154 pub status: SegmentStatus,
156 pub events_processed: u64,
158 pub duration: Option<Duration>,
160 pub error: Option<String>,
162}
163
164impl BackfillSegment {
165 pub fn block_count(&self) -> u64 {
167 self.to_block - self.from_block + 1
168 }
169}
170
171#[derive(Debug, Clone, Serialize, Deserialize)]
175pub struct BackfillProgress {
176 pub total_segments: usize,
178 pub completed_segments: usize,
180 pub failed_segments: usize,
182 pub total_blocks: u64,
184 pub processed_blocks: u64,
186 pub total_events: u64,
188 pub elapsed: Duration,
190}
191
192impl BackfillProgress {
193 pub fn blocks_per_second(&self) -> f64 {
197 let secs = self.elapsed.as_secs_f64();
198 if secs == 0.0 {
199 return 0.0;
200 }
201 self.processed_blocks as f64 / secs
202 }
203
204 pub fn eta(&self) -> Duration {
208 let remaining = self.total_blocks.saturating_sub(self.processed_blocks);
209 if remaining == 0 {
210 return Duration::ZERO;
211 }
212 let bps = self.blocks_per_second();
213 if bps == 0.0 {
214 return Duration::MAX;
215 }
216 Duration::from_secs_f64(remaining as f64 / bps)
217 }
218
219 pub fn percent_complete(&self) -> f64 {
221 if self.total_blocks == 0 {
222 return 100.0;
223 }
224 (self.processed_blocks as f64 / self.total_blocks as f64) * 100.0
225 }
226}
227
228#[derive(Debug, Clone, Serialize, Deserialize)]
232pub struct BackfillResult {
233 pub segments: Vec<BackfillSegment>,
235 pub total_events: u64,
237 pub total_duration: Duration,
239 pub failed_segments: Vec<usize>,
241}
242
243#[async_trait]
250pub trait BlockDataProvider: Send + Sync {
251 async fn get_events(
253 &self,
254 from: u64,
255 to: u64,
256 filter: &EventFilter,
257 ) -> Result<Vec<DecodedEvent>, IndexerError>;
258
259 async fn get_block(
262 &self,
263 number: u64,
264 ) -> Result<Option<crate::types::BlockSummary>, IndexerError>;
265}
266
267struct EngineState {
271 segments: Vec<BackfillSegment>,
272 events: Vec<Vec<DecodedEvent>>,
274 processed_blocks: u64,
275 total_events: u64,
276 start_time: Instant,
277}
278
279impl EngineState {
280 fn new(segments: Vec<BackfillSegment>) -> Self {
281 let n = segments.len();
282 Self {
283 segments,
284 events: vec![vec![]; n],
285 processed_blocks: 0,
286 total_events: 0,
287 start_time: Instant::now(),
288 }
289 }
290
291 fn progress(&self, total_blocks: u64) -> BackfillProgress {
292 let completed = self
293 .segments
294 .iter()
295 .filter(|s| s.status == SegmentStatus::Complete)
296 .count();
297 let failed = self
298 .segments
299 .iter()
300 .filter(|s| s.status == SegmentStatus::Failed)
301 .count();
302
303 BackfillProgress {
304 total_segments: self.segments.len(),
305 completed_segments: completed,
306 failed_segments: failed,
307 total_blocks,
308 processed_blocks: self.processed_blocks,
309 total_events: self.total_events,
310 elapsed: self.start_time.elapsed(),
311 }
312 }
313}
314
315pub struct BackfillEngine {
321 config: BackfillConfig,
322 provider: Arc<dyn BlockDataProvider>,
323 filter: EventFilter,
324 chain: String,
325 state: Arc<Mutex<EngineState>>,
326 total_blocks: u64,
327}
328
329impl BackfillEngine {
330 pub fn new(
332 config: BackfillConfig,
333 provider: Arc<dyn BlockDataProvider>,
334 filter: EventFilter,
335 chain: impl Into<String>,
336 ) -> Self {
337 let segments = config.segments();
338 let total_blocks = if config.from_block <= config.to_block {
339 config.to_block - config.from_block + 1
340 } else {
341 0
342 };
343 let state = Arc::new(Mutex::new(EngineState::new(segments)));
344 Self {
345 config,
346 provider,
347 filter,
348 chain: chain.into(),
349 state,
350 total_blocks,
351 }
352 }
353
354 pub async fn progress(&self) -> BackfillProgress {
358 self.state.lock().await.progress(self.total_blocks)
359 }
360
361 pub async fn run(&self) -> Result<BackfillResult, IndexerError> {
367 self.config.validate()?;
368
369 let segment_count = {
370 let guard = self.state.lock().await;
371 guard.segments.len()
372 };
373
374 if segment_count == 0 {
375 info!(chain = %self.chain, "backfill: empty range, nothing to do");
376 return Ok(BackfillResult {
377 segments: vec![],
378 total_events: 0,
379 total_duration: Duration::ZERO,
380 failed_segments: vec![],
381 });
382 }
383
384 info!(
385 chain = %self.chain,
386 from = self.config.from_block,
387 to = self.config.to_block,
388 segments = segment_count,
389 concurrency = self.config.concurrency,
390 "backfill: starting"
391 );
392
393 let start = Instant::now();
394 let semaphore = Arc::new(Semaphore::new(self.config.concurrency));
395
396 let mut handles = Vec::with_capacity(segment_count);
398
399 for seg_id in 0..segment_count {
400 let sem = semaphore.clone();
401 let state = self.state.clone();
402 let provider = self.provider.clone();
403 let filter = self.filter.clone();
404 let config = self.config.clone();
405 let chain = self.chain.clone();
406
407 let handle = tokio::spawn(async move {
408 let _permit = sem.acquire().await.expect("semaphore closed");
409
410 {
412 let mut guard = state.lock().await;
413 guard.segments[seg_id].status = SegmentStatus::InProgress;
414 }
415
416 let (from_block, to_block) = {
417 let guard = state.lock().await;
418 (
419 guard.segments[seg_id].from_block,
420 guard.segments[seg_id].to_block,
421 )
422 };
423
424 let seg_start = Instant::now();
425 let mut last_error: Option<String> = None;
426 let mut succeeded = false;
427 let mut collected: Vec<DecodedEvent> = vec![];
428
429 for attempt in 0..=config.retry_attempts {
430 if attempt > 0 {
431 let backoff = config.retry_delay * 2u32.pow(attempt - 1);
433 warn!(
434 chain = %chain,
435 seg = seg_id,
436 attempt,
437 backoff_ms = backoff.as_millis(),
438 "backfill: retrying segment"
439 );
440 tokio::time::sleep(backoff).await;
441 }
442
443 match process_segment(
444 seg_id,
445 from_block,
446 to_block,
447 &config,
448 provider.as_ref(),
449 &filter,
450 &chain,
451 )
452 .await
453 {
454 Ok(events) => {
455 collected = events;
456 succeeded = true;
457 break;
458 }
459 Err(e) => {
460 let msg = e.to_string();
461 error!(
462 chain = %chain,
463 seg = seg_id,
464 attempt,
465 error = %msg,
466 "backfill: segment attempt failed"
467 );
468 last_error = Some(msg);
469 }
470 }
471 }
472
473 let elapsed = seg_start.elapsed();
474 let event_count = collected.len() as u64;
475 let block_count = to_block - from_block + 1;
476
477 {
479 let mut guard = state.lock().await;
480 if succeeded {
481 guard.segments[seg_id].status = SegmentStatus::Complete;
482 guard.segments[seg_id].events_processed = event_count;
483 guard.segments[seg_id].duration = Some(elapsed);
484 guard.events[seg_id] = collected;
485 guard.processed_blocks += block_count;
486 guard.total_events += event_count;
487 debug!(
488 chain = %chain,
489 seg = seg_id,
490 events = event_count,
491 blocks = block_count,
492 elapsed_ms = elapsed.as_millis(),
493 "backfill: segment complete"
494 );
495 } else {
496 guard.segments[seg_id].status = SegmentStatus::Failed;
497 guard.segments[seg_id].error = last_error;
498 guard.segments[seg_id].duration = Some(elapsed);
499 }
500 }
501 });
502
503 handles.push(handle);
504 }
505
506 join_all(handles).await;
507
508 let total_duration = start.elapsed();
509 let guard = self.state.lock().await;
510
511 let total_events = guard.total_events;
512 let failed_segments: Vec<usize> = guard
513 .segments
514 .iter()
515 .filter(|s| s.status == SegmentStatus::Failed)
516 .map(|s| s.id)
517 .collect();
518
519 info!(
520 chain = %self.chain,
521 total_events,
522 failed = failed_segments.len(),
523 elapsed_ms = total_duration.as_millis(),
524 "backfill: complete"
525 );
526
527 Ok(BackfillResult {
528 segments: guard.segments.clone(),
529 total_events,
530 total_duration,
531 failed_segments,
532 })
533 }
534}
535
536async fn process_segment(
539 seg_id: usize,
540 from_block: u64,
541 to_block: u64,
542 config: &BackfillConfig,
543 provider: &dyn BlockDataProvider,
544 filter: &EventFilter,
545 chain: &str,
546) -> Result<Vec<DecodedEvent>, IndexerError> {
547 let mut all_events = Vec::new();
548 let mut batch_from = from_block;
549
550 while batch_from <= to_block {
551 let batch_to = (batch_from + config.batch_size - 1).min(to_block);
552
553 debug!(
554 chain = %chain,
555 seg = seg_id,
556 batch_from,
557 batch_to,
558 "backfill: fetching batch"
559 );
560
561 let events = provider.get_events(batch_from, batch_to, filter).await?;
562 all_events.extend(events);
563
564 match batch_to.checked_add(1) {
565 Some(next) => batch_from = next,
566 None => break,
567 }
568 }
569
570 Ok(all_events)
571}
572
573pub struct SegmentMerger;
581
582impl SegmentMerger {
583 pub fn merge(segments: &[BackfillSegment], events: &[Vec<DecodedEvent>]) -> Vec<DecodedEvent> {
588 assert_eq!(
589 segments.len(),
590 events.len(),
591 "segments and events slices must have equal length"
592 );
593
594 let mut merged: Vec<DecodedEvent> = segments
596 .iter()
597 .zip(events.iter())
598 .filter(|(seg, _)| seg.status == SegmentStatus::Complete)
599 .flat_map(|(_, evts)| evts.iter().cloned())
600 .collect();
601
602 merged.sort_by_key(|e| (e.block_number, e.log_index));
605 merged
606 }
607}
608
609#[cfg(test)]
612mod tests {
613 use super::*;
614 use std::sync::atomic::{AtomicU32, Ordering};
615 use tokio::time::Duration;
616
617 fn make_event(block_number: u64, log_index: u32) -> DecodedEvent {
620 DecodedEvent {
621 chain: "ethereum".into(),
622 schema: "TestEvent".into(),
623 address: "0xdeadbeef".into(),
624 tx_hash: format!("0x{block_number:064x}"),
625 block_number,
626 log_index,
627 fields_json: serde_json::Value::Null,
628 }
629 }
630
631 fn make_segment(id: usize, from: u64, to: u64, status: SegmentStatus) -> BackfillSegment {
632 BackfillSegment {
633 id,
634 from_block: from,
635 to_block: to,
636 status,
637 events_processed: 0,
638 duration: None,
639 error: None,
640 }
641 }
642
643 struct MockProvider {
647 call_count: Arc<AtomicU32>,
649 fail_times: Arc<AtomicU32>,
651 }
652
653 impl MockProvider {
654 fn new() -> Self {
655 Self {
656 call_count: Arc::new(AtomicU32::new(0)),
657 fail_times: Arc::new(AtomicU32::new(0)),
658 }
659 }
660
661 fn with_failures(n: u32) -> Self {
662 let p = Self::new();
663 p.fail_times.store(n, Ordering::SeqCst);
664 p
665 }
666 }
667
668 #[async_trait]
669 impl BlockDataProvider for MockProvider {
670 async fn get_events(
671 &self,
672 from: u64,
673 to: u64,
674 _filter: &EventFilter,
675 ) -> Result<Vec<DecodedEvent>, IndexerError> {
676 self.call_count.fetch_add(1, Ordering::SeqCst);
677
678 let remaining = self.fail_times.load(Ordering::SeqCst);
680 if remaining > 0 {
681 self.fail_times.store(remaining - 1, Ordering::SeqCst);
682 return Err(IndexerError::Rpc("mock RPC error".into()));
683 }
684
685 let events = (from..=to).map(|b| make_event(b, 0)).collect();
687 Ok(events)
688 }
689
690 async fn get_block(
691 &self,
692 number: u64,
693 ) -> Result<Option<crate::types::BlockSummary>, IndexerError> {
694 Ok(Some(crate::types::BlockSummary {
695 number,
696 hash: format!("0x{number:064x}"),
697 parent_hash: format!("0x{:064x}", number.saturating_sub(1)),
698 timestamp: number as i64 * 12,
699 tx_count: 0,
700 }))
701 }
702 }
703
704 #[test]
707 fn segments_exact_multiple() {
708 let cfg = BackfillConfig {
709 from_block: 0,
710 to_block: 29_999,
711 segment_size: 10_000,
712 ..BackfillConfig::default()
713 };
714 let segs = cfg.segments();
715 assert_eq!(segs.len(), 3);
716 assert_eq!(segs[0].from_block, 0);
717 assert_eq!(segs[0].to_block, 9_999);
718 assert_eq!(segs[1].from_block, 10_000);
719 assert_eq!(segs[1].to_block, 19_999);
720 assert_eq!(segs[2].from_block, 20_000);
721 assert_eq!(segs[2].to_block, 29_999);
722 }
723
724 #[test]
725 fn segments_non_multiple_range() {
726 let cfg = BackfillConfig {
727 from_block: 100,
728 to_block: 10_250,
729 segment_size: 10_000,
730 ..BackfillConfig::default()
731 };
732 let segs = cfg.segments();
733 assert_eq!(segs.len(), 2);
734 assert_eq!(segs[0].from_block, 100);
735 assert_eq!(segs[0].to_block, 10_099); assert_eq!(segs[1].from_block, 10_100);
737 assert_eq!(segs[1].to_block, 10_250);
738 }
739
740 #[test]
741 fn segments_single_block() {
742 let cfg = BackfillConfig {
743 from_block: 42,
744 to_block: 42,
745 segment_size: 10_000,
746 ..BackfillConfig::default()
747 };
748 let segs = cfg.segments();
749 assert_eq!(segs.len(), 1);
750 assert_eq!(segs[0].from_block, 42);
751 assert_eq!(segs[0].to_block, 42);
752 assert_eq!(segs[0].block_count(), 1);
753 }
754
755 #[test]
756 fn segments_empty_when_from_gt_to() {
757 let cfg = BackfillConfig {
758 from_block: 100,
759 to_block: 50, segment_size: 10_000,
761 ..BackfillConfig::default()
762 };
763 assert!(cfg.segments().is_empty());
764 }
765
766 #[test]
767 fn segment_ids_are_sequential() {
768 let cfg = BackfillConfig {
769 from_block: 0,
770 to_block: 49_999,
771 segment_size: 10_000,
772 ..BackfillConfig::default()
773 };
774 for (i, seg) in cfg.segments().iter().enumerate() {
775 assert_eq!(seg.id, i);
776 }
777 }
778
779 #[test]
782 fn config_validate_ok() {
783 let cfg = BackfillConfig {
784 from_block: 0,
785 to_block: 1_000,
786 ..BackfillConfig::default()
787 };
788 assert!(cfg.validate().is_ok());
789 }
790
791 #[test]
792 fn config_validate_from_gt_to() {
793 let cfg = BackfillConfig {
794 from_block: 500,
795 to_block: 100,
796 ..BackfillConfig::default()
797 };
798 assert!(cfg.validate().is_err());
799 }
800
801 #[test]
802 fn config_validate_zero_concurrency() {
803 let cfg = BackfillConfig {
804 from_block: 0,
805 to_block: 100,
806 concurrency: 0,
807 ..BackfillConfig::default()
808 };
809 assert!(cfg.validate().is_err());
810 }
811
812 #[test]
813 fn config_validate_zero_segment_size() {
814 let cfg = BackfillConfig {
815 from_block: 0,
816 to_block: 100,
817 segment_size: 0,
818 ..BackfillConfig::default()
819 };
820 assert!(cfg.validate().is_err());
821 }
822
823 #[test]
824 fn config_validate_zero_batch_size() {
825 let cfg = BackfillConfig {
826 from_block: 0,
827 to_block: 100,
828 batch_size: 0,
829 ..BackfillConfig::default()
830 };
831 assert!(cfg.validate().is_err());
832 }
833
834 #[test]
837 fn progress_percent_zero_at_start() {
838 let p = BackfillProgress {
839 total_segments: 10,
840 completed_segments: 0,
841 failed_segments: 0,
842 total_blocks: 100_000,
843 processed_blocks: 0,
844 total_events: 0,
845 elapsed: Duration::from_secs(0),
846 };
847 assert_eq!(p.percent_complete(), 0.0);
848 }
849
850 #[test]
851 fn progress_percent_complete_100() {
852 let p = BackfillProgress {
853 total_segments: 10,
854 completed_segments: 10,
855 failed_segments: 0,
856 total_blocks: 100_000,
857 processed_blocks: 100_000,
858 total_events: 42,
859 elapsed: Duration::from_secs(10),
860 };
861 assert!((p.percent_complete() - 100.0).abs() < f64::EPSILON);
862 }
863
864 #[test]
865 fn progress_blocks_per_second() {
866 let p = BackfillProgress {
867 total_segments: 1,
868 completed_segments: 1,
869 failed_segments: 0,
870 total_blocks: 1000,
871 processed_blocks: 500,
872 total_events: 0,
873 elapsed: Duration::from_secs(5),
874 };
875 assert!((p.blocks_per_second() - 100.0).abs() < 1e-9);
877 }
878
879 #[test]
880 fn progress_blocks_per_second_zero_elapsed() {
881 let p = BackfillProgress {
882 total_segments: 1,
883 completed_segments: 0,
884 failed_segments: 0,
885 total_blocks: 1000,
886 processed_blocks: 0,
887 total_events: 0,
888 elapsed: Duration::from_secs(0),
889 };
890 assert_eq!(p.blocks_per_second(), 0.0);
891 }
892
893 #[test]
894 fn progress_eta_zero_when_done() {
895 let p = BackfillProgress {
896 total_segments: 1,
897 completed_segments: 1,
898 failed_segments: 0,
899 total_blocks: 1000,
900 processed_blocks: 1000,
901 total_events: 0,
902 elapsed: Duration::from_secs(10),
903 };
904 assert_eq!(p.eta(), Duration::ZERO);
905 }
906
907 #[test]
908 fn progress_eta_reasonable() {
909 let p = BackfillProgress {
910 total_segments: 2,
911 completed_segments: 1,
912 failed_segments: 0,
913 total_blocks: 1000,
914 processed_blocks: 500,
915 total_events: 0,
916 elapsed: Duration::from_secs(5),
917 };
918 let eta = p.eta();
920 assert!(eta.as_secs_f64() > 4.9 && eta.as_secs_f64() < 5.1);
921 }
922
923 #[test]
924 fn progress_percent_empty_range() {
925 let p = BackfillProgress {
926 total_segments: 0,
927 completed_segments: 0,
928 failed_segments: 0,
929 total_blocks: 0,
930 processed_blocks: 0,
931 total_events: 0,
932 elapsed: Duration::ZERO,
933 };
934 assert_eq!(p.percent_complete(), 100.0);
936 }
937
938 #[tokio::test]
941 async fn engine_empty_range() {
942 let cfg = BackfillConfig {
943 from_block: 100,
944 to_block: 50, ..BackfillConfig::default()
946 };
947 let provider = Arc::new(MockProvider::new());
948 let engine = BackfillEngine::new(cfg, provider, EventFilter::default(), "ethereum");
949 let result = engine.run().await;
951 assert!(result.is_err());
952 }
953
954 #[tokio::test]
955 async fn engine_single_block_range() {
956 let cfg = BackfillConfig {
957 from_block: 42,
958 to_block: 42,
959 segment_size: 10_000,
960 batch_size: 500,
961 concurrency: 2,
962 retry_attempts: 0,
963 retry_delay: Duration::from_millis(10),
964 };
965 let provider = Arc::new(MockProvider::new());
966 let engine = BackfillEngine::new(cfg, provider, EventFilter::default(), "ethereum");
967 let result = engine.run().await.unwrap();
968
969 assert_eq!(result.segments.len(), 1);
970 assert_eq!(result.total_events, 1); assert!(result.failed_segments.is_empty());
972 }
973
974 #[tokio::test]
975 async fn engine_successful_backfill_counts_events() {
976 let cfg = BackfillConfig {
978 from_block: 0,
979 to_block: 99,
980 segment_size: 10,
981 batch_size: 5,
982 concurrency: 4,
983 retry_attempts: 0,
984 retry_delay: Duration::from_millis(10),
985 };
986 let provider = Arc::new(MockProvider::new());
987 let engine = BackfillEngine::new(cfg, provider, EventFilter::default(), "ethereum");
988 let result = engine.run().await.unwrap();
989
990 assert_eq!(result.segments.len(), 10);
991 assert_eq!(result.total_events, 100);
993 assert!(result.failed_segments.is_empty());
994 assert!(result
995 .segments
996 .iter()
997 .all(|s| s.status == SegmentStatus::Complete));
998 }
999
1000 #[tokio::test]
1001 async fn engine_concurrency_bounded_by_semaphore() {
1002 let cfg = BackfillConfig {
1004 from_block: 0,
1005 to_block: 499,
1006 segment_size: 50,
1007 batch_size: 25,
1008 concurrency: 3,
1009 retry_attempts: 0,
1010 retry_delay: Duration::from_millis(1),
1011 };
1012 let provider = Arc::new(MockProvider::new());
1013 let engine = BackfillEngine::new(cfg, provider.clone(), EventFilter::default(), "ethereum");
1014 let result = engine.run().await.unwrap();
1015
1016 assert_eq!(result.segments.len(), 10);
1018 assert_eq!(result.total_events, 500);
1019 assert!(result.failed_segments.is_empty());
1020 }
1021
1022 #[tokio::test]
1023 async fn engine_retry_succeeds_after_failures() {
1024 let cfg = BackfillConfig {
1026 from_block: 0,
1027 to_block: 9,
1028 segment_size: 10,
1029 batch_size: 10,
1030 concurrency: 1,
1031 retry_attempts: 3,
1032 retry_delay: Duration::from_millis(1),
1033 };
1034 let provider = Arc::new(MockProvider::with_failures(1));
1036 let engine = BackfillEngine::new(cfg, provider, EventFilter::default(), "ethereum");
1037 let result = engine.run().await.unwrap();
1038
1039 assert_eq!(result.segments.len(), 1);
1040 assert_eq!(result.total_events, 10);
1041 assert!(result.failed_segments.is_empty());
1042 assert_eq!(result.segments[0].status, SegmentStatus::Complete);
1043 }
1044
1045 #[tokio::test]
1046 async fn engine_segment_fails_after_all_retries() {
1047 let cfg = BackfillConfig {
1049 from_block: 0,
1050 to_block: 9,
1051 segment_size: 10,
1052 batch_size: 10,
1053 concurrency: 1,
1054 retry_attempts: 2,
1055 retry_delay: Duration::from_millis(1),
1056 };
1057 let provider = Arc::new(MockProvider::with_failures(10));
1059 let engine = BackfillEngine::new(cfg, provider, EventFilter::default(), "ethereum");
1060 let result = engine.run().await.unwrap();
1061
1062 assert_eq!(result.segments.len(), 1);
1063 assert_eq!(result.total_events, 0);
1064 assert_eq!(result.failed_segments, vec![0]);
1065 assert_eq!(result.segments[0].status, SegmentStatus::Failed);
1066 }
1067
1068 #[tokio::test]
1069 async fn engine_large_range_many_segments() {
1070 let cfg = BackfillConfig {
1071 from_block: 0,
1072 to_block: 9_999,
1073 segment_size: 1_000,
1074 batch_size: 200,
1075 concurrency: 5,
1076 retry_attempts: 0,
1077 retry_delay: Duration::from_millis(1),
1078 };
1079 let provider = Arc::new(MockProvider::new());
1080 let engine = BackfillEngine::new(cfg, provider, EventFilter::default(), "ethereum");
1081 let result = engine.run().await.unwrap();
1082
1083 assert_eq!(result.segments.len(), 10);
1084 assert_eq!(result.total_events, 10_000);
1085 assert!(result.failed_segments.is_empty());
1086 }
1087
1088 #[tokio::test]
1089 async fn engine_progress_reflects_completed_segments() {
1090 let cfg = BackfillConfig {
1091 from_block: 0,
1092 to_block: 19,
1093 segment_size: 10,
1094 batch_size: 10,
1095 concurrency: 1,
1096 retry_attempts: 0,
1097 retry_delay: Duration::from_millis(1),
1098 };
1099 let provider = Arc::new(MockProvider::new());
1100 let engine = BackfillEngine::new(cfg, provider, EventFilter::default(), "ethereum");
1101
1102 let pre = engine.progress().await;
1104 assert_eq!(pre.completed_segments, 0);
1105 assert_eq!(pre.processed_blocks, 0);
1106
1107 engine.run().await.unwrap();
1108
1109 let post = engine.progress().await;
1110 assert_eq!(post.completed_segments, 2);
1111 assert_eq!(post.processed_blocks, 20);
1112 assert!((post.percent_complete() - 100.0).abs() < f64::EPSILON);
1113 }
1114
1115 #[test]
1118 fn merger_preserves_block_order() {
1119 let segs = vec![
1120 make_segment(0, 0, 9, SegmentStatus::Complete),
1121 make_segment(1, 10, 19, SegmentStatus::Complete),
1122 ];
1123 let events = vec![
1125 vec![make_event(5, 0), make_event(3, 0)],
1126 vec![make_event(15, 0), make_event(10, 0)],
1127 ];
1128
1129 let merged = SegmentMerger::merge(&segs, &events);
1130 assert_eq!(merged.len(), 4);
1131 assert_eq!(merged[0].block_number, 3);
1132 assert_eq!(merged[1].block_number, 5);
1133 assert_eq!(merged[2].block_number, 10);
1134 assert_eq!(merged[3].block_number, 15);
1135 }
1136
1137 #[test]
1138 fn merger_skips_failed_segments() {
1139 let segs = vec![
1140 make_segment(0, 0, 9, SegmentStatus::Complete),
1141 make_segment(1, 10, 19, SegmentStatus::Failed),
1142 make_segment(2, 20, 29, SegmentStatus::Complete),
1143 ];
1144 let events = vec![
1145 vec![make_event(1, 0)],
1146 vec![make_event(15, 0)], vec![make_event(21, 0)],
1148 ];
1149
1150 let merged = SegmentMerger::merge(&segs, &events);
1151 assert_eq!(merged.len(), 2);
1152 assert!(merged.iter().all(|e| e.block_number != 15));
1153 }
1154
1155 #[test]
1156 fn merger_tiebreaks_by_log_index() {
1157 let segs = vec![make_segment(0, 100, 100, SegmentStatus::Complete)];
1158 let events = vec![vec![
1159 make_event(100, 3),
1160 make_event(100, 1),
1161 make_event(100, 2),
1162 ]];
1163
1164 let merged = SegmentMerger::merge(&segs, &events);
1165 assert_eq!(merged[0].log_index, 1);
1166 assert_eq!(merged[1].log_index, 2);
1167 assert_eq!(merged[2].log_index, 3);
1168 }
1169
1170 #[test]
1171 fn merger_empty_input() {
1172 let merged = SegmentMerger::merge(&[], &[]);
1173 assert!(merged.is_empty());
1174 }
1175}