1use std::collections::VecDeque;
2use std::sync::Arc;
3
4use async_trait::async_trait;
5use tokio::sync::{mpsc, RwLock};
6use tokio_util::sync::CancellationToken;
7
8use crate::datasource::{Datasource, DatasourceId, UpdateType, Updates};
9use crate::error::IndexerResult;
10use crate::metrics::MetricsCollection;
11
12#[derive(Debug, Clone, Copy, PartialEq, Eq)]
16enum SyncPhase {
17 Backfilling { target_tip: u64 },
19 LivePassthrough,
21 Reconnecting { target_tip: u64 },
23}
24
25#[derive(Debug, Clone)]
27struct SyncStateInner {
28 phase: SyncPhase,
29 last_indexed: u64,
30 cutoff_height: u64,
31 safe_tip: u64,
32 pass_through_enabled: bool,
33}
34
35pub struct SyncState {
37 inner: Arc<RwLock<SyncStateInner>>,
38}
39
40impl SyncState {
41 fn new(last_indexed: u64, safe_tip: u64) -> Self {
43 Self {
44 inner: Arc::new(RwLock::new(SyncStateInner {
45 phase: SyncPhase::Backfilling {
46 target_tip: safe_tip,
47 },
48 last_indexed,
49 cutoff_height: 0,
50 safe_tip,
51 pass_through_enabled: false,
52 })),
53 }
54 }
55
56 async fn snapshot(&self) -> SyncStateInner {
58 self.inner.read().await.clone()
59 }
60
61 async fn update<F>(&self, f: F)
63 where
64 F: FnOnce(&mut SyncStateInner),
65 {
66 let mut guard = self.inner.write().await;
67 f(&mut guard);
68 }
69
70 async fn set_last_indexed(&self, height: u64, update_cutoff: bool) {
72 self.update(|state| {
73 state.last_indexed = height;
74 if update_cutoff {
75 state.cutoff_height = height;
76 }
77 })
78 .await;
79 }
80
81 async fn set_cutoff_height(&self, height: u64) {
83 self.update(|state| {
84 state.cutoff_height = height;
85 })
86 .await;
87 }
88
89 async fn set_pass_through(&self, enabled: bool) {
91 self.update(|state| {
92 state.pass_through_enabled = enabled;
93 if enabled {
94 state.phase = SyncPhase::LivePassthrough;
95 }
96 })
97 .await;
98 }
99
100 async fn set_phase(&self, phase: SyncPhase) {
102 self.update(|state| {
103 state.phase = phase;
104 })
105 .await;
106 }
107
108 async fn set_safe_tip(&self, tip: u64) {
110 self.update(|state| {
111 state.safe_tip = tip;
112 })
113 .await;
114 }
115}
116
117#[derive(Debug, Clone)]
118pub struct SyncConfig {
119 pub batch_size: u64,
120 pub max_live_buffer: usize,
122 pub live_start_distance: u64,
124 pub progress_log_every_batches: u64,
126}
127
128impl Default for SyncConfig {
129 fn default() -> Self {
130 Self {
131 batch_size: 1_000,
132 max_live_buffer: 10_000,
133 live_start_distance: 128,
134 progress_log_every_batches: 10,
135 }
136 }
137}
138
139#[async_trait]
144pub trait TipSource: Send + Sync {
145 async fn best_height(&self) -> IndexerResult<u64>;
147}
148
149#[async_trait]
153pub trait CheckpointStore: Send + Sync {
154 async fn last_indexed_height(&self) -> IndexerResult<u64>;
156 async fn set_last_indexed_height(&self, height: u64) -> IndexerResult<()>;
158}
159
160#[async_trait]
166pub trait BackfillSource: Send + Sync {
167 async fn backfill_range(
169 &self,
170 start_height_inclusive: u64,
171 end_height_inclusive: u64,
172 sender: tokio::sync::mpsc::Sender<(Updates, DatasourceId)>,
173 cancellation_token: CancellationToken,
174 metrics: Arc<MetricsCollection>,
175 ) -> IndexerResult<()>;
176}
177
178#[async_trait]
183pub trait LiveSource: Send + Sync {
184 async fn consume_live(
186 &self,
187 start_from_height_exclusive: u64,
188 sender: tokio::sync::mpsc::Sender<(Updates, DatasourceId)>,
189 cancellation_token: CancellationToken,
190 metrics: Arc<MetricsCollection>,
191 reconnection_notifier: tokio::sync::mpsc::Sender<()>,
192 ) -> IndexerResult<()>;
193
194 fn update_types(&self) -> Vec<UpdateType>;
196}
197
198#[derive(Clone)]
199pub struct SyncingDatasource {
200 tip: Arc<dyn TipSource>,
201 checkpoint: Arc<dyn CheckpointStore>,
202 backfill: Arc<dyn BackfillSource>,
203 live: Arc<dyn LiveSource>,
204 update_types: Vec<UpdateType>,
205 config: SyncConfig,
206}
207
208impl SyncingDatasource {
209 async fn catch_up_to_current_tip(
210 &self,
211 state: &SyncState,
212 sender: tokio::sync::mpsc::Sender<(Updates, DatasourceId)>,
213 cancellation_token: CancellationToken,
214 metrics: Arc<MetricsCollection>,
215 ) -> IndexerResult<()> {
216 let snapshot = state.snapshot().await;
219 let last_indexed = snapshot.last_indexed;
220 let best_now = self.tip.best_height().await?;
221
222 if best_now > last_indexed {
223 let start = last_indexed.saturating_add(1);
224 let end = best_now;
225 log::info!(
226 "sync: catch-up backfill from {} to current best {} before enabling live",
227 start,
228 end
229 );
230
231 let (bf_tx, mut bf_rx) = mpsc::channel::<(Updates, DatasourceId)>(1_000);
232 let forward_sender_clone = sender.clone();
233 let forward_handle = tokio::spawn(async move {
234 while let Some((u, ds)) = bf_rx.recv().await {
235 let _ = forward_sender_clone.send((u, ds)).await;
236 }
237 });
238
239 self.backfill
240 .backfill_range(
241 start,
242 end,
243 bf_tx,
244 cancellation_token.clone(),
245 metrics.clone(),
246 )
247 .await?;
248 let _ = forward_handle.await;
249
250 self.checkpoint.set_last_indexed_height(end).await?;
251 state.set_last_indexed(end, true).await;
252 metrics
253 .update_gauge("sync_last_indexed", end as f64)
254 .await?;
255 }
256
257 Ok(())
258 }
259 pub fn new(
260 tip: Arc<dyn TipSource>,
261 checkpoint: Arc<dyn CheckpointStore>,
262 backfill: Arc<dyn BackfillSource>,
263 live: Arc<dyn LiveSource>,
264 update_types: Vec<UpdateType>,
265 config: SyncConfig,
266 ) -> Self {
267 Self {
268 tip,
269 checkpoint,
270 backfill,
271 live,
272 update_types,
273 config,
274 }
275 }
276}
277
278impl SyncingDatasource {
279 async fn initialize_state_and_metrics(
280 &self,
281 metrics: Arc<MetricsCollection>,
282 ) -> IndexerResult<(u64, u64, u64)> {
283 let last_indexed = self.checkpoint.last_indexed_height().await?;
284 let best_height = self.tip.best_height().await?;
285 let safe_tip = best_height;
286
287 metrics
288 .update_gauge("sync_best_height", best_height as f64)
289 .await?;
290 metrics
291 .update_gauge("sync_safe_tip", safe_tip as f64)
292 .await?;
293 metrics
294 .update_gauge("sync_last_indexed", last_indexed as f64)
295 .await?;
296
297 Ok((last_indexed, best_height, safe_tip))
298 }
299
300 fn spawn_live_forwarder(
301 &self,
302 live_rx: tokio::sync::mpsc::Receiver<(Updates, DatasourceId)>,
303 state: Arc<SyncState>,
304 forward_sender: tokio::sync::mpsc::Sender<(Updates, DatasourceId)>,
305 checkpoint: Arc<dyn CheckpointStore>,
306 max_buffer: usize,
307 cancellation_token: CancellationToken,
308 metrics: Arc<MetricsCollection>,
309 ) {
310 let live_token_forwarder = cancellation_token;
311 let live_metrics_forwarder = metrics;
312
313 tokio::spawn(async move {
314 let mut last_persisted_height: u64 = match checkpoint.last_indexed_height().await {
316 Ok(h) => h,
317 Err(_) => 0,
318 };
319 let mut buffer: VecDeque<(Updates, DatasourceId)> = VecDeque::new();
320 let mut flushed_after_enable = false;
321 let mut live_rx = live_rx;
322 loop {
323 tokio::select! {
324 _ = live_token_forwarder.cancelled() => {
325 break;
326 }
327 maybe_msg = live_rx.recv() => {
328 match maybe_msg {
329 Some((updates, datasource_id)) => {
330 let snapshot = state.snapshot().await;
331 if snapshot.pass_through_enabled {
332 if !flushed_after_enable {
333 while let Some((mut u, ds_id)) = buffer.pop_front() {
335 if let Some(filtered) = filter_by_cutoff(&mut u, snapshot.cutoff_height) {
336 if let Some(max_h) = max_height_in_updates(&filtered) {
337 if max_h > last_persisted_height {
338 let _ = checkpoint.set_last_indexed_height(max_h).await;
339 let _ = live_metrics_forwarder.update_gauge("sync_last_indexed", max_h as f64).await;
340 last_persisted_height = max_h;
341 state.set_last_indexed(max_h, false).await;
343 }
344 }
345 let _ = forward_sender.send((filtered, ds_id)).await;
346 }
347 let _ = live_metrics_forwarder.increment_counter("sync_live_buffer_flushed", 1).await;
348 }
349 flushed_after_enable = true;
350 }
351 let current_snapshot = state.snapshot().await;
353 let mut u = updates;
354 if let Some(filtered) = filter_by_cutoff(&mut u, current_snapshot.cutoff_height) {
355 if let Some(max_h) = max_height_in_updates(&filtered) {
356 if max_h > last_persisted_height {
357 let _ = checkpoint.set_last_indexed_height(max_h).await;
358 let _ = live_metrics_forwarder.update_gauge("sync_last_indexed", max_h as f64).await;
359 last_persisted_height = max_h;
360 state.set_last_indexed(max_h, false).await;
362 }
363 }
364 let _ = forward_sender.send((filtered, datasource_id)).await;
365 }
366 } else {
367 if buffer.len() >= max_buffer {
368 buffer.pop_front();
369 let _ = live_metrics_forwarder.increment_counter("sync_live_buffer_overflow", 1).await;
370 }
371 buffer.push_back((updates, datasource_id));
372 let _ = live_metrics_forwarder.increment_counter("sync_live_buffered", 1).await;
373 }
374 }
375 None => break,
376 }
377 }
378 }
379 }
380 });
381 }
382
383 fn spawn_live_stream(
384 &self,
385 state: Arc<SyncState>,
386 live_tx: tokio::sync::mpsc::Sender<(Updates, DatasourceId)>,
387 cancellation_token: CancellationToken,
388 metrics: Arc<MetricsCollection>,
389 reconnection_notifier: tokio::sync::mpsc::Sender<()>,
390 ) {
391 let live_source = Arc::clone(&self.live);
392 tokio::spawn(async move {
393 let snapshot = state.snapshot().await;
395 let start_from_height_exclusive = snapshot.last_indexed;
396 let _ = live_source
397 .consume_live(
398 start_from_height_exclusive,
399 live_tx,
400 cancellation_token,
401 metrics,
402 reconnection_notifier,
403 )
404 .await;
405 });
406 }
407
408 fn spawn_reconciliation_listener(
409 &self,
410 state: Arc<SyncState>,
411 sender: mpsc::Sender<(Updates, DatasourceId)>,
412 cancellation_token: CancellationToken,
413 metrics: Arc<MetricsCollection>,
414 reconnect_rx: mpsc::Receiver<()>,
415 ) {
416 let datasource_for_reconcile = self.clone();
417 let state_reconcile = Arc::clone(&state);
418 let sender_reconcile = sender;
419 let cancel_reconcile = cancellation_token;
420 let metrics_reconcile = metrics;
421 let mut reconnect_rx = reconnect_rx;
422
423 tokio::spawn(async move {
424 loop {
425 tokio::select! {
426 _ = cancel_reconcile.cancelled() => {
427 break;
428 }
429 maybe_notification = reconnect_rx.recv() => {
430 match maybe_notification {
431 Some(()) => {
432 if let Err(e) = datasource_for_reconcile.reconcile_if_behind(
433 &state_reconcile,
434 sender_reconcile.clone(),
435 cancel_reconcile.clone(),
436 metrics_reconcile.clone(),
437 ).await {
438 log::error!("sync: reconciliation error: {}", e);
439 }
440 }
441 None => break,
442 }
443 }
444 }
445 }
446 });
447 }
448
449 async fn backfill_if_needed(
450 &self,
451 state: &SyncState,
452 final_target_tip: u64,
453 sender: tokio::sync::mpsc::Sender<(Updates, DatasourceId)>,
454 cancellation_token: CancellationToken,
455 metrics: Arc<MetricsCollection>,
456 ) -> IndexerResult<()> {
457 let snapshot = state.snapshot().await;
458 let last_indexed = snapshot.last_indexed;
459
460 if last_indexed < final_target_tip {
461 log::info!(
462 "sync: entering backfill mode from {} toward safe tip (best={}, initial_safe_tip={})",
463 last_indexed + 1,
464 final_target_tip,
465 final_target_tip
466 );
467
468 state
469 .set_phase(SyncPhase::Backfilling {
470 target_tip: final_target_tip,
471 })
472 .await;
473 let mut current = last_indexed.saturating_add(1);
474 let mut batches_processed: u64 = 0;
475 let start_time = std::time::Instant::now();
476 loop {
477 if cancellation_token.is_cancelled() {
478 log::info!("sync: cancellation received during backfill");
479 return Ok(());
480 }
481
482 let best_height_now = self.tip.best_height().await?;
484 metrics
485 .update_gauge("sync_best_height", best_height_now as f64)
486 .await?;
487 metrics
488 .update_gauge("sync_safe_tip", final_target_tip as f64)
489 .await?;
490 state.set_safe_tip(final_target_tip).await;
491
492 if current > final_target_tip {
493 break;
494 }
495
496 let end = std::cmp::min(
497 final_target_tip,
498 current.saturating_add(self.config.batch_size.saturating_sub(1)),
499 );
500
501 metrics
502 .increment_counter("sync_backfill_batches", 1)
503 .await?;
504 batches_processed = batches_processed.saturating_add(1);
505
506 let (bf_tx, mut bf_rx) = mpsc::channel::<(Updates, DatasourceId)>(1_000);
507 let forward_sender_clone = sender.clone();
508 let forward_handle = tokio::spawn(async move {
509 while let Some((u, ds)) = bf_rx.recv().await {
510 let _ = forward_sender_clone.send((u, ds)).await;
511 }
512 });
513
514 self.backfill
515 .backfill_range(
516 current,
517 end,
518 bf_tx,
519 cancellation_token.clone(),
520 metrics.clone(),
521 )
522 .await?;
523 let _ = forward_handle.await;
524
525 self.checkpoint.set_last_indexed_height(end).await?;
526 state.set_last_indexed(end, true).await;
527 metrics
528 .update_gauge("sync_last_indexed", end as f64)
529 .await?;
530
531 current = end.saturating_add(1);
532
533 if batches_processed % self.config.progress_log_every_batches == 0 {
535 let snapshot = state.snapshot().await;
536 let elapsed = start_time.elapsed().as_secs_f64();
537 let heights_done = (snapshot
538 .last_indexed
539 .saturating_sub(current.saturating_sub(1)))
540 .max(1);
541 let total_remaining = final_target_tip.saturating_sub(snapshot.last_indexed);
542 let rate_hps = (heights_done as f64 / elapsed.max(0.001)).max(0.000_001);
543 let eta_secs = (total_remaining as f64 / rate_hps).round() as u64;
544 log::info!(
545 "sync: backfill progress heights=[{}..{}], remaining={}, rate={:.0} h/s, eta={}s",
546 current.saturating_sub(self.config.batch_size.saturating_sub(1)),
547 snapshot.last_indexed,
548 total_remaining,
549 rate_hps,
550 eta_secs
551 );
552 }
553
554 }
557 } else {
558 log::info!(
559 "sync: already at or past safe tip (last_indexed={}, safe_tip={}), entering live",
560 last_indexed,
561 final_target_tip
562 );
563 }
564
565 Ok(())
566 }
567
568 async fn enable_live_passthrough(
569 &self,
570 state: &SyncState,
571 metrics: Arc<MetricsCollection>,
572 ) -> IndexerResult<()> {
573 let snapshot = state.snapshot().await;
575 state.set_cutoff_height(snapshot.last_indexed).await;
576 state.set_pass_through(true).await;
577 metrics
578 .update_gauge("sync_live_passthrough_enabled", 1.0)
579 .await?;
580 log::info!("sync: live pass-through enabled; backfill complete");
581 Ok(())
582 }
583
584 async fn reconcile_if_behind(
585 &self,
586 state: &SyncState,
587 sender: mpsc::Sender<(Updates, DatasourceId)>,
588 cancellation_token: CancellationToken,
589 metrics: Arc<MetricsCollection>,
590 ) -> IndexerResult<()> {
591 let snapshot = state.snapshot().await;
592 let last = snapshot.last_indexed;
593 let best = self.tip.best_height().await?;
594
595 if best > last {
596 log::info!(
597 "sync: reconciliation triggered after reconnection (last={}, best={})",
598 last,
599 best
600 );
601
602 state.set_pass_through(false).await;
604 state
605 .set_phase(SyncPhase::Reconnecting { target_tip: best })
606 .await;
607 metrics
608 .update_gauge("sync_live_passthrough_enabled", 0.0)
609 .await?;
610
611 state.set_cutoff_height(last).await;
613
614 self.backfill_if_needed(
616 state,
617 best,
618 sender.clone(),
619 cancellation_token.clone(),
620 metrics.clone(),
621 )
622 .await?;
623
624 self.catch_up_to_current_tip(
626 state,
627 sender.clone(),
628 cancellation_token.clone(),
629 metrics.clone(),
630 )
631 .await?;
632
633 self.enable_live_passthrough(state, metrics.clone()).await?;
635
636 let final_snapshot = state.snapshot().await;
637 log::info!(
638 "sync: reconciliation complete (last_indexed={})",
639 final_snapshot.last_indexed
640 );
641 }
642
643 Ok(())
644 }
645}
646
647#[async_trait]
648impl Datasource for SyncingDatasource {
649 async fn consume(
650 &self,
651 _id: DatasourceId,
652 sender: mpsc::Sender<(Updates, DatasourceId)>,
653 cancellation_token: CancellationToken,
654 metrics: Arc<MetricsCollection>,
655 ) -> IndexerResult<()> {
656 let (last_indexed, _best_height, safe_tip) =
657 self.initialize_state_and_metrics(metrics.clone()).await?;
658
659 let state = Arc::new(SyncState::new(last_indexed, safe_tip));
661
662 let (live_tx, live_rx) = mpsc::channel::<(Updates, DatasourceId)>(1_000);
664 self.spawn_live_forwarder(
665 live_rx,
666 Arc::clone(&state),
667 sender.clone(),
668 Arc::clone(&self.checkpoint),
669 self.config.max_live_buffer,
670 cancellation_token.clone(),
671 metrics.clone(),
672 );
673
674 let (reconnect_tx, reconnect_rx) = mpsc::channel::<()>(10);
676
677 self.spawn_reconciliation_listener(
679 Arc::clone(&state),
680 sender.clone(),
681 cancellation_token.clone(),
682 metrics.clone(),
683 reconnect_rx,
684 );
685
686 self.spawn_live_stream(
688 Arc::clone(&state),
689 live_tx.clone(),
690 cancellation_token.clone(),
691 metrics.clone(),
692 reconnect_tx,
693 );
694
695 self.backfill_if_needed(
696 &state,
697 safe_tip,
698 sender.clone(),
699 cancellation_token.clone(),
700 metrics.clone(),
701 )
702 .await?;
703
704 self.catch_up_to_current_tip(
706 &state,
707 sender.clone(),
708 cancellation_token.clone(),
709 metrics.clone(),
710 )
711 .await?;
712
713 self.enable_live_passthrough(&state, metrics.clone())
714 .await?;
715
716 cancellation_token.cancelled().await;
719 Ok(())
720 }
721
722 fn update_types(&self) -> Vec<UpdateType> {
723 self.update_types.clone()
724 }
725}
726
727fn filter_by_cutoff(updates: &mut Updates, cutoff_height: u64) -> Option<Updates> {
729 match updates {
730 Updates::Transactions(txs) => {
731 let mut filtered = Vec::new();
732 for t in txs.iter() {
733 if t.height > cutoff_height {
734 filtered.push(t.clone());
735 }
736 }
737 if filtered.is_empty() {
738 None
739 } else {
740 Some(Updates::Transactions(filtered))
741 }
742 }
743 Updates::BlockDetails(blocks) => {
744 let filtered: Vec<crate::datasource::BlockDetails> = blocks
745 .iter()
746 .cloned()
747 .filter(|b| b.height > cutoff_height)
748 .collect();
749 if filtered.is_empty() {
750 None
751 } else {
752 Some(Updates::BlockDetails(filtered))
753 }
754 }
755 Updates::Accounts(accounts) => {
756 let filtered: Vec<crate::datasource::AccountUpdate> = accounts
757 .iter()
758 .cloned()
759 .filter(|a| a.height > cutoff_height)
760 .collect();
761 if filtered.is_empty() {
762 None
763 } else {
764 Some(Updates::Accounts(filtered))
765 }
766 }
767 Updates::AccountDeletions(deletions) => {
768 let filtered: Vec<crate::datasource::AccountDeletion> = deletions
769 .iter()
770 .cloned()
771 .filter(|d| d.height > cutoff_height)
772 .collect();
773 if filtered.is_empty() {
774 None
775 } else {
776 Some(Updates::AccountDeletions(filtered))
777 }
778 }
779 Updates::BitcoinBlocks(blocks) => {
780 let filtered: Vec<crate::datasource::BitcoinBlock> = blocks
781 .iter()
782 .cloned()
783 .filter(|b| b.block_height > cutoff_height)
784 .collect();
785 if filtered.is_empty() {
786 None
787 } else {
788 Some(Updates::BitcoinBlocks(filtered))
789 }
790 }
791 Updates::RolledbackTransactions(events) => {
792 Some(Updates::RolledbackTransactions(events.clone()))
793 }
794 Updates::ReappliedTransactions(events) => {
795 Some(Updates::ReappliedTransactions(events.clone()))
796 }
797 }
798}
799
800fn max_height_in_updates(updates: &Updates) -> Option<u64> {
802 match updates {
803 Updates::Transactions(txs) => txs.iter().map(|t| t.height).max(),
804 Updates::BlockDetails(blocks) => blocks.iter().map(|b| b.height).max(),
805 Updates::Accounts(accts) => accts.iter().map(|a| a.height).max(),
806 Updates::AccountDeletions(dels) => dels.iter().map(|d| d.height).max(),
807 Updates::BitcoinBlocks(blocks) => blocks.iter().map(|b| b.block_height).max(),
808 Updates::RolledbackTransactions(_) => None,
809 Updates::ReappliedTransactions(_) => None,
810 }
811}
812
813#[cfg(test)]
814mod tests {
815 use std::sync::atomic::{AtomicU64, Ordering};
816 use std::sync::Arc;
817 use tokio::sync::mpsc;
818 use tokio_util::sync::CancellationToken;
819
820 use super::*;
821 use crate::datasource::{
822 AccountDeletion, AccountUpdate, BitcoinBlock, BlockDetails, TransactionUpdate, UpdateType,
823 };
824 use crate::error::IndexerResult;
825 use crate::metrics::MetricsCollection;
826
827 struct MockTipSource {
830 height: Arc<AtomicU64>,
831 }
832
833 impl MockTipSource {
834 fn new(height: u64) -> Self {
835 Self {
836 height: Arc::new(AtomicU64::new(height)),
837 }
838 }
839
840 fn set_height(&self, height: u64) {
841 self.height.store(height, Ordering::SeqCst);
842 }
843 }
844
845 #[async_trait]
846 impl TipSource for MockTipSource {
847 async fn best_height(&self) -> IndexerResult<u64> {
848 Ok(self.height.load(Ordering::SeqCst))
849 }
850 }
851
852 struct MockCheckpointStore {
853 last_indexed: Arc<AtomicU64>,
854 }
855
856 impl MockCheckpointStore {
857 fn new(last_indexed: u64) -> Self {
858 Self {
859 last_indexed: Arc::new(AtomicU64::new(last_indexed)),
860 }
861 }
862
863 fn get_last_indexed(&self) -> u64 {
864 self.last_indexed.load(Ordering::SeqCst)
865 }
866 }
867
868 #[async_trait]
869 impl CheckpointStore for MockCheckpointStore {
870 async fn last_indexed_height(&self) -> IndexerResult<u64> {
871 Ok(self.last_indexed.load(Ordering::SeqCst))
872 }
873
874 async fn set_last_indexed_height(&self, height: u64) -> IndexerResult<()> {
875 self.last_indexed.store(height, Ordering::SeqCst);
876 Ok(())
877 }
878 }
879
880 struct MockBackfillSource {
881 updates: Vec<(u64, u64, Updates)>, }
883
884 impl MockBackfillSource {
885 fn new() -> Self {
886 Self {
887 updates: Vec::new(),
888 }
889 }
890
891 fn add_range(&mut self, start: u64, end: u64, updates: Updates) {
892 self.updates.push((start, end, updates));
893 }
894 }
895
896 #[async_trait]
897 impl BackfillSource for MockBackfillSource {
898 async fn backfill_range(
899 &self,
900 start_height_inclusive: u64,
901 end_height_inclusive: u64,
902 sender: mpsc::Sender<(Updates, DatasourceId)>,
903 _cancellation_token: CancellationToken,
904 _metrics: Arc<MetricsCollection>,
905 ) -> IndexerResult<()> {
906 for (start, end, updates) in &self.updates {
907 if *start == start_height_inclusive && *end == end_height_inclusive {
908 let _ = sender
909 .send((updates.clone(), DatasourceId::new_unique()))
910 .await;
911 return Ok(());
912 }
913 }
914 Ok(())
915 }
916 }
917
918 struct MockLiveSource {
919 updates: Vec<Updates>,
920 should_reconnect: bool,
921 }
922
923 impl MockLiveSource {
924 fn new() -> Self {
925 Self {
926 updates: Vec::new(),
927 should_reconnect: false,
928 }
929 }
930
931 fn add_update(&mut self, update: Updates) {
932 self.updates.push(update);
933 }
934
935 fn set_should_reconnect(&mut self, should: bool) {
936 self.should_reconnect = should;
937 }
938 }
939
940 #[async_trait]
941 impl LiveSource for MockLiveSource {
942 async fn consume_live(
943 &self,
944 _start_from_height_exclusive: u64,
945 sender: mpsc::Sender<(Updates, DatasourceId)>,
946 cancellation_token: CancellationToken,
947 _metrics: Arc<MetricsCollection>,
948 reconnection_notifier: mpsc::Sender<()>,
949 ) -> IndexerResult<()> {
950 let updates = self.updates.clone();
951 let should_reconnect = self.should_reconnect;
952 tokio::spawn(async move {
953 for update in updates {
954 if cancellation_token.is_cancelled() {
955 break;
956 }
957 let _ = sender.send((update, DatasourceId::new_unique())).await;
958 tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
959 }
960 if should_reconnect {
961 let _ = reconnection_notifier.send(()).await;
962 }
963 });
964 Ok(())
965 }
966
967 fn update_types(&self) -> Vec<UpdateType> {
968 vec![UpdateType::Transaction]
969 }
970 }
971
972 fn create_test_updates(start_height: u64, count: u64) -> Updates {
973 use arch_program::hash::Hash;
974 use arch_program::sanitized::{ArchMessage, MessageHeader};
975 use arch_sdk::{RollbackStatus, RuntimeTransaction, Status};
976
977 let mut txs = Vec::new();
978 for i in 0..count {
979 txs.push(TransactionUpdate {
980 transaction: arch_sdk::ProcessedTransaction {
981 runtime_transaction: RuntimeTransaction {
982 version: 1,
983 signatures: vec![],
984 message: ArchMessage {
985 header: MessageHeader {
986 num_readonly_signed_accounts: 0,
987 num_readonly_unsigned_accounts: 0,
988 num_required_signatures: 0,
989 },
990 account_keys: vec![],
991 instructions: vec![],
992 recent_blockhash: Hash::default(),
993 },
994 },
995 inner_instructions_list: vec![],
996 status: Status::Processed,
997 bitcoin_txid: None,
998 logs: vec![],
999 rollback_status: RollbackStatus::NotRolledback,
1000 },
1001 height: start_height + i,
1002 });
1003 }
1004 Updates::Transactions(txs)
1005 }
1006
1007 fn create_test_block_details(start_height: u64, count: u64) -> Updates {
1008 let mut blocks = Vec::new();
1009 for i in 0..count {
1010 blocks.push(BlockDetails {
1011 height: start_height + i,
1012 block_hash: None,
1013 previous_block_hash: None,
1014 block_time: None,
1015 block_height: None,
1016 });
1017 }
1018 Updates::BlockDetails(blocks)
1019 }
1020
1021 fn create_test_accounts(start_height: u64, count: u64) -> Updates {
1022 let mut accounts = Vec::new();
1023 for i in 0..count {
1024 accounts.push(AccountUpdate {
1025 pubkey: arch_program::pubkey::Pubkey::default(),
1026 account: arch_sdk::AccountInfo {
1027 lamports: 0,
1028 owner: arch_program::pubkey::Pubkey::default(),
1029 data: vec![],
1030 utxo: String::new(),
1031 is_executable: false,
1032 },
1033 height: start_height + i,
1034 });
1035 }
1036 Updates::Accounts(accounts)
1037 }
1038
1039 fn create_test_bitcoin_blocks(start_height: u64, count: u64) -> Updates {
1040 let mut blocks = Vec::new();
1041 for i in 0..count {
1042 blocks.push(BitcoinBlock {
1043 block_height: start_height + i,
1044 block_hash: format!("hash_{}", start_height + i),
1045 });
1046 }
1047 Updates::BitcoinBlocks(blocks)
1048 }
1049
1050 #[tokio::test]
1053 async fn test_sync_state_initialization() {
1054 let state = SyncState::new(100, 200);
1055 let snapshot = state.snapshot().await;
1056
1057 assert_eq!(snapshot.last_indexed, 100);
1058 assert_eq!(snapshot.safe_tip, 200);
1059 assert_eq!(snapshot.cutoff_height, 0);
1060 assert!(!snapshot.pass_through_enabled);
1061 match snapshot.phase {
1062 SyncPhase::Backfilling { target_tip } => assert_eq!(target_tip, 200),
1063 _ => panic!("Expected Backfilling phase"),
1064 }
1065 }
1066
1067 #[tokio::test]
1068 async fn test_sync_state_set_last_indexed() {
1069 let state = SyncState::new(100, 200);
1070
1071 state.set_last_indexed(150, true).await;
1072 let snapshot = state.snapshot().await;
1073 assert_eq!(snapshot.last_indexed, 150);
1074 assert_eq!(snapshot.cutoff_height, 150);
1075
1076 state.set_last_indexed(175, false).await;
1077 let snapshot = state.snapshot().await;
1078 assert_eq!(snapshot.last_indexed, 175);
1079 assert_eq!(snapshot.cutoff_height, 150); }
1081
1082 #[tokio::test]
1083 async fn test_sync_state_set_pass_through() {
1084 let state = SyncState::new(100, 200);
1085
1086 state.set_pass_through(true).await;
1087 let snapshot = state.snapshot().await;
1088 assert!(snapshot.pass_through_enabled);
1089 match snapshot.phase {
1090 SyncPhase::LivePassthrough => {}
1091 _ => panic!("Expected LivePassthrough phase"),
1092 }
1093
1094 state.set_pass_through(false).await;
1095 let snapshot = state.snapshot().await;
1096 assert!(!snapshot.pass_through_enabled);
1097 }
1098
1099 #[tokio::test]
1100 async fn test_sync_state_set_phase() {
1101 let state = SyncState::new(100, 200);
1102
1103 state
1104 .set_phase(SyncPhase::Reconnecting { target_tip: 250 })
1105 .await;
1106 let snapshot = state.snapshot().await;
1107 match snapshot.phase {
1108 SyncPhase::Reconnecting { target_tip } => assert_eq!(target_tip, 250),
1109 _ => panic!("Expected Reconnecting phase"),
1110 }
1111 }
1112
1113 #[tokio::test]
1114 async fn test_sync_state_set_safe_tip() {
1115 let state = SyncState::new(100, 200);
1116
1117 state.set_safe_tip(300).await;
1118 let snapshot = state.snapshot().await;
1119 assert_eq!(snapshot.safe_tip, 300);
1120 }
1121
1122 #[test]
1125 fn test_filter_by_cutoff_transactions() {
1126 let mut updates = create_test_updates(100, 10);
1127 let filtered = filter_by_cutoff(&mut updates, 105);
1128
1129 assert!(filtered.is_some());
1130 if let Some(Updates::Transactions(txs)) = filtered {
1131 assert_eq!(txs.len(), 4); assert_eq!(txs[0].height, 106);
1133 assert_eq!(txs[3].height, 109);
1134 } else {
1135 panic!("Expected Transactions variant");
1136 }
1137 }
1138
1139 #[test]
1140 fn test_filter_by_cutoff_transactions_all_below() {
1141 let mut updates = create_test_updates(100, 10);
1142 let filtered = filter_by_cutoff(&mut updates, 150);
1143
1144 assert!(filtered.is_none());
1145 }
1146
1147 #[test]
1148 fn test_filter_by_cutoff_transactions_all_above() {
1149 let mut updates = create_test_updates(100, 10);
1150 let filtered = filter_by_cutoff(&mut updates, 50);
1151
1152 assert!(filtered.is_some());
1153 if let Some(Updates::Transactions(txs)) = filtered {
1154 assert_eq!(txs.len(), 10);
1155 } else {
1156 panic!("Expected Transactions variant");
1157 }
1158 }
1159
1160 #[test]
1161 fn test_filter_by_cutoff_block_details() {
1162 let mut updates = create_test_block_details(100, 10);
1163 let filtered = filter_by_cutoff(&mut updates, 105);
1164
1165 assert!(filtered.is_some());
1166 if let Some(Updates::BlockDetails(blocks)) = filtered {
1167 assert_eq!(blocks.len(), 4);
1168 assert_eq!(blocks[0].height, 106);
1169 } else {
1170 panic!("Expected BlockDetails variant");
1171 }
1172 }
1173
1174 #[test]
1175 fn test_filter_by_cutoff_accounts() {
1176 let mut updates = create_test_accounts(100, 10);
1177 let filtered = filter_by_cutoff(&mut updates, 105);
1178
1179 assert!(filtered.is_some());
1180 if let Some(Updates::Accounts(accounts)) = filtered {
1181 assert_eq!(accounts.len(), 4);
1182 assert_eq!(accounts[0].height, 106);
1183 } else {
1184 panic!("Expected Accounts variant");
1185 }
1186 }
1187
1188 #[test]
1189 fn test_filter_by_cutoff_bitcoin_blocks() {
1190 let mut updates = create_test_bitcoin_blocks(100, 10);
1191 let filtered = filter_by_cutoff(&mut updates, 105);
1192
1193 assert!(filtered.is_some());
1194 if let Some(Updates::BitcoinBlocks(blocks)) = filtered {
1195 assert_eq!(blocks.len(), 4);
1196 assert_eq!(blocks[0].block_height, 106);
1197 } else {
1198 panic!("Expected BitcoinBlocks variant");
1199 }
1200 }
1201
1202 #[test]
1203 fn test_filter_by_cutoff_account_deletions() {
1204 let mut deletions = Vec::new();
1205 for i in 0..10 {
1206 deletions.push(AccountDeletion {
1207 pubkey: arch_program::pubkey::Pubkey::default(),
1208 height: 100 + i,
1209 });
1210 }
1211 let mut updates = Updates::AccountDeletions(deletions);
1212 let filtered = filter_by_cutoff(&mut updates, 105);
1213
1214 assert!(filtered.is_some());
1215 if let Some(Updates::AccountDeletions(dels)) = filtered {
1216 assert_eq!(dels.len(), 4);
1217 assert_eq!(dels[0].height, 106);
1218 } else {
1219 panic!("Expected AccountDeletions variant");
1220 }
1221 }
1222
1223 #[test]
1224 fn test_filter_by_cutoff_rollbacked_transactions() {
1225 let events = vec![crate::datasource::RolledbackTransactionsEvent {
1226 height: 100,
1227 transaction_hashes: vec!["hash1".to_string()],
1228 }];
1229 let mut updates = Updates::RolledbackTransactions(events.clone());
1230 let filtered = filter_by_cutoff(&mut updates, 150);
1231
1232 assert!(filtered.is_some());
1234 if let Some(Updates::RolledbackTransactions(filtered_events)) = filtered {
1235 assert_eq!(filtered_events.len(), 1);
1236 } else {
1237 panic!("Expected RolledbackTransactions variant");
1238 }
1239 }
1240
1241 #[test]
1244 fn test_max_height_in_updates_transactions() {
1245 let updates = create_test_updates(100, 10);
1246 let max_height = max_height_in_updates(&updates);
1247 assert_eq!(max_height, Some(109));
1248 }
1249
1250 #[test]
1251 fn test_max_height_in_updates_block_details() {
1252 let updates = create_test_block_details(50, 5);
1253 let max_height = max_height_in_updates(&updates);
1254 assert_eq!(max_height, Some(54));
1255 }
1256
1257 #[test]
1258 fn test_max_height_in_updates_accounts() {
1259 let updates = create_test_accounts(200, 3);
1260 let max_height = max_height_in_updates(&updates);
1261 assert_eq!(max_height, Some(202));
1262 }
1263
1264 #[test]
1265 fn test_max_height_in_updates_bitcoin_blocks() {
1266 let updates = create_test_bitcoin_blocks(300, 4);
1267 let max_height = max_height_in_updates(&updates);
1268 assert_eq!(max_height, Some(303));
1269 }
1270
1271 #[test]
1272 fn test_max_height_in_updates_empty() {
1273 let updates = Updates::Transactions(Vec::new());
1274 let max_height = max_height_in_updates(&updates);
1275 assert_eq!(max_height, None);
1276 }
1277
1278 #[test]
1279 fn test_max_height_in_updates_rollbacked() {
1280 let events = vec![crate::datasource::RolledbackTransactionsEvent {
1281 height: 100,
1282 transaction_hashes: vec!["hash1".to_string()],
1283 }];
1284 let updates = Updates::RolledbackTransactions(events);
1285 let max_height = max_height_in_updates(&updates);
1286 assert_eq!(max_height, None);
1287 }
1288
1289 #[tokio::test]
1292 async fn test_initialize_state_and_metrics() {
1293 let tip = Arc::new(MockTipSource::new(500));
1294 let checkpoint = Arc::new(MockCheckpointStore::new(100));
1295 let backfill = Arc::new(MockBackfillSource::new());
1296 let live = Arc::new(MockLiveSource::new());
1297 let metrics = Arc::new(MetricsCollection::default());
1298
1299 let datasource = SyncingDatasource::new(
1300 tip,
1301 checkpoint,
1302 backfill,
1303 live,
1304 vec![UpdateType::Transaction],
1305 SyncConfig::default(),
1306 );
1307
1308 let (last_indexed, best_height, safe_tip) = datasource
1309 .initialize_state_and_metrics(metrics)
1310 .await
1311 .unwrap();
1312
1313 assert_eq!(last_indexed, 100);
1314 assert_eq!(best_height, 500);
1315 assert_eq!(safe_tip, 500);
1316 }
1317
1318 #[tokio::test]
1319 async fn test_backfill_if_needed_no_backfill() {
1320 let tip = Arc::new(MockTipSource::new(500));
1321 let checkpoint = Arc::new(MockCheckpointStore::new(500));
1322 let backfill = Arc::new(MockBackfillSource::new());
1323 let live = Arc::new(MockLiveSource::new());
1324 let metrics = Arc::new(MetricsCollection::default());
1325
1326 let datasource = SyncingDatasource::new(
1327 tip,
1328 checkpoint,
1329 backfill,
1330 live,
1331 vec![UpdateType::Transaction],
1332 SyncConfig::default(),
1333 );
1334
1335 let state = Arc::new(SyncState::new(500, 500));
1336 let (tx, _rx) = mpsc::channel(1000);
1337 let cancel = CancellationToken::new();
1338
1339 let result = datasource
1340 .backfill_if_needed(&state, 500, tx, cancel, metrics)
1341 .await;
1342
1343 assert!(result.is_ok());
1344 let snapshot = state.snapshot().await;
1345 assert_eq!(snapshot.last_indexed, 500);
1346 }
1347
1348 #[tokio::test]
1349 async fn test_backfill_if_needed_with_backfill() {
1350 let tip = Arc::new(MockTipSource::new(500));
1351 let checkpoint = Arc::new(MockCheckpointStore::new(100));
1352 let mut mock_backfill = MockBackfillSource::new();
1353 mock_backfill.add_range(101, 200, create_test_updates(101, 100));
1354 let backfill = Arc::new(mock_backfill);
1355 let live = Arc::new(MockLiveSource::new());
1356 let metrics = Arc::new(MetricsCollection::default());
1357
1358 let datasource = SyncingDatasource::new(
1359 tip,
1360 checkpoint,
1361 backfill,
1362 live,
1363 vec![UpdateType::Transaction],
1364 SyncConfig {
1365 batch_size: 100,
1366 ..Default::default()
1367 },
1368 );
1369
1370 let state = Arc::new(SyncState::new(100, 200));
1371 let (tx, mut rx) = mpsc::channel(1000);
1372 let cancel = CancellationToken::new();
1373
1374 let result = datasource
1375 .backfill_if_needed(&state, 200, tx.clone(), cancel, metrics)
1376 .await;
1377
1378 assert!(result.is_ok());
1379 let snapshot = state.snapshot().await;
1380 assert_eq!(snapshot.last_indexed, 200);
1381
1382 let mut received_count = 0;
1384 while let Ok(Some(_)) =
1385 tokio::time::timeout(tokio::time::Duration::from_millis(100), rx.recv()).await
1386 {
1387 received_count += 1;
1388 }
1389 assert!(received_count > 0);
1390 }
1391
1392 #[tokio::test]
1393 async fn test_enable_live_passthrough() {
1394 let tip = Arc::new(MockTipSource::new(500));
1395 let checkpoint = Arc::new(MockCheckpointStore::new(200));
1396 let backfill = Arc::new(MockBackfillSource::new());
1397 let live = Arc::new(MockLiveSource::new());
1398 let metrics = Arc::new(MetricsCollection::default());
1399
1400 let datasource = SyncingDatasource::new(
1401 tip,
1402 checkpoint,
1403 backfill,
1404 live,
1405 vec![UpdateType::Transaction],
1406 SyncConfig::default(),
1407 );
1408
1409 let state = Arc::new(SyncState::new(200, 200));
1410 state.set_last_indexed(200, true).await;
1411
1412 let result = datasource.enable_live_passthrough(&state, metrics).await;
1413 assert!(result.is_ok());
1414
1415 let snapshot = state.snapshot().await;
1416 assert!(snapshot.pass_through_enabled);
1417 assert_eq!(snapshot.cutoff_height, 200);
1418 match snapshot.phase {
1419 SyncPhase::LivePassthrough => {}
1420 _ => panic!("Expected LivePassthrough phase"),
1421 }
1422 }
1423
1424 #[tokio::test]
1425 async fn test_catch_up_to_current_tip() {
1426 let tip = Arc::new(MockTipSource::new(250));
1427 let checkpoint = Arc::new(MockCheckpointStore::new(200));
1428 let checkpoint_clone = Arc::clone(&checkpoint);
1429 let mut mock_backfill = MockBackfillSource::new();
1430 mock_backfill.add_range(201, 250, create_test_updates(201, 50));
1431 let backfill = Arc::new(mock_backfill);
1432 let live = Arc::new(MockLiveSource::new());
1433 let metrics = Arc::new(MetricsCollection::default());
1434
1435 let datasource = SyncingDatasource::new(
1436 tip,
1437 checkpoint,
1438 backfill,
1439 live,
1440 vec![UpdateType::Transaction],
1441 SyncConfig::default(),
1442 );
1443
1444 let state = Arc::new(SyncState::new(200, 200));
1445 let (tx, mut rx) = mpsc::channel(1000);
1446 let cancel = CancellationToken::new();
1447
1448 let result = datasource
1449 .catch_up_to_current_tip(&state, tx.clone(), cancel, metrics)
1450 .await;
1451
1452 assert!(result.is_ok());
1453 let snapshot = state.snapshot().await;
1454 assert_eq!(snapshot.last_indexed, 250);
1455 let checkpoint_height = checkpoint_clone.get_last_indexed();
1457 assert_eq!(checkpoint_height, 250);
1458
1459 let mut received = false;
1461 while let Ok(Some(_)) =
1462 tokio::time::timeout(tokio::time::Duration::from_millis(100), rx.recv()).await
1463 {
1464 received = true;
1465 }
1466 assert!(received);
1467 }
1468
1469 #[tokio::test]
1470 async fn test_catch_up_to_current_tip_no_gap() {
1471 let tip = Arc::new(MockTipSource::new(200));
1472 let checkpoint = Arc::new(MockCheckpointStore::new(200));
1473 let backfill = Arc::new(MockBackfillSource::new());
1474 let live = Arc::new(MockLiveSource::new());
1475 let metrics = Arc::new(MetricsCollection::default());
1476
1477 let datasource = SyncingDatasource::new(
1478 tip,
1479 checkpoint,
1480 backfill,
1481 live,
1482 vec![UpdateType::Transaction],
1483 SyncConfig::default(),
1484 );
1485
1486 let state = Arc::new(SyncState::new(200, 200));
1487 let (tx, _rx) = mpsc::channel(1000);
1488 let cancel = CancellationToken::new();
1489
1490 let result = datasource
1491 .catch_up_to_current_tip(&state, tx, cancel, metrics)
1492 .await;
1493
1494 assert!(result.is_ok());
1495 let snapshot = state.snapshot().await;
1496 assert_eq!(snapshot.last_indexed, 200);
1497 }
1498
1499 #[tokio::test]
1500 async fn test_reconcile_if_behind() {
1501 let tip = Arc::new(MockTipSource::new(300));
1502 let checkpoint = Arc::new(MockCheckpointStore::new(200));
1503 let mut mock_backfill = MockBackfillSource::new();
1504 mock_backfill.add_range(201, 300, create_test_updates(201, 100));
1505 let backfill = Arc::new(mock_backfill);
1506 let live = Arc::new(MockLiveSource::new());
1507 let metrics = Arc::new(MetricsCollection::default());
1508
1509 let datasource = SyncingDatasource::new(
1510 tip,
1511 checkpoint,
1512 backfill,
1513 live,
1514 vec![UpdateType::Transaction],
1515 SyncConfig {
1516 batch_size: 100,
1517 ..Default::default()
1518 },
1519 );
1520
1521 let state = Arc::new(SyncState::new(200, 200));
1522 state.set_pass_through(true).await; let (tx, _rx) = mpsc::channel(1000);
1524 let cancel = CancellationToken::new();
1525
1526 let result = datasource
1527 .reconcile_if_behind(&state, tx, cancel, metrics)
1528 .await;
1529
1530 assert!(result.is_ok());
1531 let snapshot = state.snapshot().await;
1532 assert_eq!(snapshot.last_indexed, 300);
1533 assert!(snapshot.pass_through_enabled); }
1535
1536 #[tokio::test]
1537 async fn test_reconcile_if_behind_no_gap() {
1538 let tip = Arc::new(MockTipSource::new(200));
1539 let checkpoint = Arc::new(MockCheckpointStore::new(200));
1540 let backfill = Arc::new(MockBackfillSource::new());
1541 let live = Arc::new(MockLiveSource::new());
1542 let metrics = Arc::new(MetricsCollection::default());
1543
1544 let datasource = SyncingDatasource::new(
1545 tip,
1546 checkpoint,
1547 backfill,
1548 live,
1549 vec![UpdateType::Transaction],
1550 SyncConfig::default(),
1551 );
1552
1553 let state = Arc::new(SyncState::new(200, 200));
1554 let (tx, _rx) = mpsc::channel(1000);
1555 let cancel = CancellationToken::new();
1556
1557 let result = datasource
1558 .reconcile_if_behind(&state, tx, cancel, metrics)
1559 .await;
1560
1561 assert!(result.is_ok());
1562 let snapshot = state.snapshot().await;
1563 assert_eq!(snapshot.last_indexed, 200);
1564 }
1565
1566 #[tokio::test]
1567 async fn test_backfill_cancellation() {
1568 let tip = Arc::new(MockTipSource::new(500));
1569 let checkpoint = Arc::new(MockCheckpointStore::new(100));
1570 let backfill = Arc::new(MockBackfillSource::new());
1571 let live = Arc::new(MockLiveSource::new());
1572 let metrics = Arc::new(MetricsCollection::default());
1573
1574 let datasource = SyncingDatasource::new(
1575 tip,
1576 checkpoint,
1577 backfill,
1578 live,
1579 vec![UpdateType::Transaction],
1580 SyncConfig::default(),
1581 );
1582
1583 let state = Arc::new(SyncState::new(100, 500));
1584 let (tx, _rx) = mpsc::channel(1000);
1585 let cancel = CancellationToken::new();
1586 cancel.cancel(); let result = datasource
1589 .backfill_if_needed(&state, 500, tx, cancel, metrics)
1590 .await;
1591
1592 assert!(result.is_ok()); }
1594
1595 #[tokio::test]
1596 async fn test_backfill_batch_processing() {
1597 let tip = Arc::new(MockTipSource::new(500));
1598 let checkpoint = Arc::new(MockCheckpointStore::new(100));
1599 let mut mock_backfill = MockBackfillSource::new();
1600 mock_backfill.add_range(101, 200, create_test_updates(101, 100));
1602 mock_backfill.add_range(201, 300, create_test_updates(201, 100));
1603 mock_backfill.add_range(301, 400, create_test_updates(301, 100));
1604 let backfill = Arc::new(mock_backfill);
1605 let live = Arc::new(MockLiveSource::new());
1606 let metrics = Arc::new(MetricsCollection::default());
1607
1608 let datasource = SyncingDatasource::new(
1609 tip,
1610 checkpoint,
1611 backfill,
1612 live,
1613 vec![UpdateType::Transaction],
1614 SyncConfig {
1615 batch_size: 100,
1616 ..Default::default()
1617 },
1618 );
1619
1620 let state = Arc::new(SyncState::new(100, 400));
1621 let (tx, _rx) = mpsc::channel(1000);
1622 let cancel = CancellationToken::new();
1623
1624 let result = datasource
1625 .backfill_if_needed(&state, 400, tx, cancel, metrics)
1626 .await;
1627
1628 assert!(result.is_ok());
1629 let snapshot = state.snapshot().await;
1630 assert_eq!(snapshot.last_indexed, 400);
1631 }
1632
1633 #[test]
1636 fn test_filter_by_cutoff_exactly_at_cutoff() {
1637 let mut updates = create_test_updates(100, 5);
1640 let filtered = filter_by_cutoff(&mut updates, 104);
1641
1642 assert!(filtered.is_none());
1644 }
1645
1646 #[test]
1647 fn test_filter_by_cutoff_empty_updates() {
1648 let mut updates = Updates::Transactions(Vec::new());
1649 let filtered = filter_by_cutoff(&mut updates, 100);
1650 assert!(filtered.is_none());
1651 }
1652
1653 #[test]
1654 fn test_filter_by_cutoff_mixed_heights() {
1655 use arch_program::hash::Hash;
1656 use arch_program::sanitized::{ArchMessage, MessageHeader};
1657 use arch_sdk::{RollbackStatus, RuntimeTransaction, Status};
1658
1659 let mut txs = Vec::new();
1660 for height in [50, 100, 101, 150, 200].iter() {
1662 txs.push(TransactionUpdate {
1663 transaction: arch_sdk::ProcessedTransaction {
1664 runtime_transaction: RuntimeTransaction {
1665 version: 1,
1666 signatures: vec![],
1667 message: ArchMessage {
1668 header: MessageHeader {
1669 num_readonly_signed_accounts: 0,
1670 num_readonly_unsigned_accounts: 0,
1671 num_required_signatures: 0,
1672 },
1673 account_keys: vec![],
1674 instructions: vec![],
1675 recent_blockhash: Hash::default(),
1676 },
1677 },
1678 inner_instructions_list: vec![],
1679 status: Status::Processed,
1680 bitcoin_txid: None,
1681 logs: vec![],
1682 rollback_status: RollbackStatus::NotRolledback,
1683 },
1684 height: *height,
1685 });
1686 }
1687 let mut updates = Updates::Transactions(txs);
1688 let filtered = filter_by_cutoff(&mut updates, 100);
1689
1690 assert!(filtered.is_some());
1691 if let Some(Updates::Transactions(filtered_txs)) = filtered {
1692 assert_eq!(filtered_txs.len(), 3);
1694 assert_eq!(filtered_txs[0].height, 101);
1695 assert_eq!(filtered_txs[1].height, 150);
1696 assert_eq!(filtered_txs[2].height, 200);
1697 } else {
1698 panic!("Expected Transactions variant");
1699 }
1700 }
1701
1702 #[test]
1703 fn test_filter_by_cutoff_reapplied_transactions() {
1704 let events = vec![crate::datasource::ReappliedTransactionsEvent {
1705 height: 100,
1706 transaction_hashes: vec!["hash1".to_string(), "hash2".to_string()],
1707 }];
1708 let mut updates = Updates::ReappliedTransactions(events.clone());
1709 let filtered = filter_by_cutoff(&mut updates, 150);
1710
1711 assert!(filtered.is_some());
1713 if let Some(Updates::ReappliedTransactions(filtered_events)) = filtered {
1714 assert_eq!(filtered_events.len(), 1);
1715 assert_eq!(filtered_events[0].transaction_hashes.len(), 2);
1716 } else {
1717 panic!("Expected ReappliedTransactions variant");
1718 }
1719 }
1720
1721 #[test]
1722 fn test_max_height_in_updates_account_deletions() {
1723 let mut deletions = Vec::new();
1724 for i in 0..5 {
1725 deletions.push(AccountDeletion {
1726 pubkey: arch_program::pubkey::Pubkey::default(),
1727 height: 100 + i,
1728 });
1729 }
1730 let updates = Updates::AccountDeletions(deletions);
1731 let max_height = max_height_in_updates(&updates);
1732 assert_eq!(max_height, Some(104));
1733 }
1734
1735 #[tokio::test]
1736 async fn test_backfill_single_block() {
1737 let tip = Arc::new(MockTipSource::new(101));
1738 let checkpoint = Arc::new(MockCheckpointStore::new(100));
1739 let mut mock_backfill = MockBackfillSource::new();
1740 mock_backfill.add_range(101, 101, create_test_updates(101, 1));
1741 let backfill = Arc::new(mock_backfill);
1742 let live = Arc::new(MockLiveSource::new());
1743 let metrics = Arc::new(MetricsCollection::default());
1744
1745 let datasource = SyncingDatasource::new(
1746 tip,
1747 checkpoint,
1748 backfill,
1749 live,
1750 vec![UpdateType::Transaction],
1751 SyncConfig {
1752 batch_size: 1000,
1753 ..Default::default()
1754 },
1755 );
1756
1757 let state = Arc::new(SyncState::new(100, 101));
1758 let (tx, mut rx) = mpsc::channel(1000);
1759 let cancel = CancellationToken::new();
1760
1761 let result = datasource
1762 .backfill_if_needed(&state, 101, tx.clone(), cancel, metrics)
1763 .await;
1764
1765 assert!(result.is_ok());
1766 let snapshot = state.snapshot().await;
1767 assert_eq!(snapshot.last_indexed, 101);
1768
1769 let received = rx.recv().await;
1771 assert!(received.is_some());
1772 }
1773
1774 #[tokio::test]
1775 async fn test_backfill_already_at_tip() {
1776 let tip = Arc::new(MockTipSource::new(100));
1777 let checkpoint = Arc::new(MockCheckpointStore::new(100));
1778 let backfill = Arc::new(MockBackfillSource::new());
1779 let live = Arc::new(MockLiveSource::new());
1780 let metrics = Arc::new(MetricsCollection::default());
1781
1782 let datasource = SyncingDatasource::new(
1783 tip,
1784 checkpoint,
1785 backfill,
1786 live,
1787 vec![UpdateType::Transaction],
1788 SyncConfig::default(),
1789 );
1790
1791 let state = Arc::new(SyncState::new(100, 100));
1792 let (tx, _rx) = mpsc::channel(1000);
1793 let cancel = CancellationToken::new();
1794
1795 let result = datasource
1796 .backfill_if_needed(&state, 100, tx, cancel, metrics)
1797 .await;
1798
1799 assert!(result.is_ok());
1800 let snapshot = state.snapshot().await;
1801 assert_eq!(snapshot.last_indexed, 100);
1802 }
1803
1804 #[tokio::test]
1805 async fn test_backfill_past_tip() {
1806 let tip = Arc::new(MockTipSource::new(100));
1807 let checkpoint = Arc::new(MockCheckpointStore::new(150));
1808 let backfill = Arc::new(MockBackfillSource::new());
1809 let live = Arc::new(MockLiveSource::new());
1810 let metrics = Arc::new(MetricsCollection::default());
1811
1812 let datasource = SyncingDatasource::new(
1813 tip,
1814 checkpoint,
1815 backfill,
1816 live,
1817 vec![UpdateType::Transaction],
1818 SyncConfig::default(),
1819 );
1820
1821 let state = Arc::new(SyncState::new(150, 100));
1822 let (tx, _rx) = mpsc::channel(1000);
1823 let cancel = CancellationToken::new();
1824
1825 let result = datasource
1826 .backfill_if_needed(&state, 100, tx, cancel, metrics)
1827 .await;
1828
1829 assert!(result.is_ok());
1830 let snapshot = state.snapshot().await;
1831 assert_eq!(snapshot.last_indexed, 150); }
1833
1834 #[tokio::test]
1835 async fn test_backfill_small_batch_size() {
1836 let tip = Arc::new(MockTipSource::new(105));
1837 let checkpoint = Arc::new(MockCheckpointStore::new(100));
1838 let mut mock_backfill = MockBackfillSource::new();
1839 mock_backfill.add_range(101, 101, create_test_updates(101, 1));
1840 mock_backfill.add_range(102, 102, create_test_updates(102, 1));
1841 mock_backfill.add_range(103, 103, create_test_updates(103, 1));
1842 mock_backfill.add_range(104, 104, create_test_updates(104, 1));
1843 mock_backfill.add_range(105, 105, create_test_updates(105, 1));
1844 let backfill = Arc::new(mock_backfill);
1845 let live = Arc::new(MockLiveSource::new());
1846 let metrics = Arc::new(MetricsCollection::default());
1847
1848 let datasource = SyncingDatasource::new(
1849 tip,
1850 checkpoint,
1851 backfill,
1852 live,
1853 vec![UpdateType::Transaction],
1854 SyncConfig {
1855 batch_size: 1,
1856 ..Default::default()
1857 },
1858 );
1859
1860 let state = Arc::new(SyncState::new(100, 105));
1861 let (tx, mut rx) = mpsc::channel(1000);
1862 let cancel = CancellationToken::new();
1863
1864 let result = datasource
1865 .backfill_if_needed(&state, 105, tx.clone(), cancel, metrics)
1866 .await;
1867
1868 assert!(result.is_ok());
1869 let snapshot = state.snapshot().await;
1870 assert_eq!(snapshot.last_indexed, 105);
1871
1872 let mut received_count = 0;
1874 while let Ok(Some(_)) =
1875 tokio::time::timeout(tokio::time::Duration::from_millis(100), rx.recv()).await
1876 {
1877 received_count += 1;
1878 }
1879 assert_eq!(received_count, 5);
1880 }
1881
1882 #[tokio::test]
1883 async fn test_sync_state_concurrent_updates() {
1884 let state = Arc::new(SyncState::new(100, 200));
1885
1886 let state1 = Arc::clone(&state);
1888 let state2 = Arc::clone(&state);
1889 let state3 = Arc::clone(&state);
1890
1891 tokio::join!(
1892 async move { state1.set_last_indexed(150, false).await },
1893 async move { state2.set_safe_tip(250).await },
1894 async move { state3.set_pass_through(true).await },
1895 );
1896
1897 let snapshot = state.snapshot().await;
1898 assert_eq!(snapshot.last_indexed, 150);
1899 assert_eq!(snapshot.safe_tip, 250);
1900 assert!(snapshot.pass_through_enabled);
1901 }
1902
1903 #[tokio::test]
1904 async fn test_reconcile_state_transitions() {
1905 let tip = Arc::new(MockTipSource::new(300));
1906 let checkpoint = Arc::new(MockCheckpointStore::new(200));
1907 let mut mock_backfill = MockBackfillSource::new();
1908 mock_backfill.add_range(201, 300, create_test_updates(201, 100));
1909 let backfill = Arc::new(mock_backfill);
1910 let live = Arc::new(MockLiveSource::new());
1911 let metrics = Arc::new(MetricsCollection::default());
1912
1913 let datasource = SyncingDatasource::new(
1914 tip,
1915 checkpoint,
1916 backfill,
1917 live,
1918 vec![UpdateType::Transaction],
1919 SyncConfig {
1920 batch_size: 100,
1921 ..Default::default()
1922 },
1923 );
1924
1925 let state = Arc::new(SyncState::new(200, 200));
1926 state.set_pass_through(true).await;
1927
1928 let snapshot = state.snapshot().await;
1930 assert!(snapshot.pass_through_enabled);
1931 match snapshot.phase {
1932 SyncPhase::LivePassthrough => {}
1933 _ => panic!("Expected LivePassthrough phase"),
1934 }
1935
1936 let (tx, _rx) = mpsc::channel(1000);
1937 let cancel = CancellationToken::new();
1938
1939 let result = datasource
1940 .reconcile_if_behind(&state, tx, cancel, metrics)
1941 .await;
1942
1943 assert!(result.is_ok());
1944
1945 let snapshot = state.snapshot().await;
1947 assert_eq!(snapshot.last_indexed, 300);
1948 assert!(snapshot.pass_through_enabled); match snapshot.phase {
1950 SyncPhase::LivePassthrough => {}
1951 _ => panic!("Expected LivePassthrough phase after reconciliation"),
1952 }
1953 }
1954
1955 #[test]
1956 fn test_filter_by_cutoff_boundary_conditions() {
1957 let mut updates = create_test_updates(1, 5);
1959 let filtered = filter_by_cutoff(&mut updates, 0);
1960 assert!(filtered.is_some());
1961 if let Some(Updates::Transactions(txs)) = filtered {
1962 assert_eq!(txs.len(), 5);
1963 } else {
1964 panic!("Expected Transactions variant");
1965 }
1966
1967 let mut updates = create_test_updates(100, 5);
1969 let filtered = filter_by_cutoff(&mut updates, u64::MAX);
1970 assert!(filtered.is_none());
1971 }
1972
1973 #[tokio::test]
1974 async fn test_catch_up_with_empty_backfill() {
1975 let tip = Arc::new(MockTipSource::new(200));
1977 let checkpoint = Arc::new(MockCheckpointStore::new(200));
1978 let backfill = Arc::new(MockBackfillSource::new());
1979 let live = Arc::new(MockLiveSource::new());
1980 let metrics = Arc::new(MetricsCollection::default());
1981
1982 let datasource = SyncingDatasource::new(
1983 tip,
1984 checkpoint,
1985 backfill,
1986 live,
1987 vec![UpdateType::Transaction],
1988 SyncConfig::default(),
1989 );
1990
1991 let state = Arc::new(SyncState::new(200, 200));
1992 let (tx, _rx) = mpsc::channel(1000);
1993 let cancel = CancellationToken::new();
1994
1995 let result = datasource
1996 .catch_up_to_current_tip(&state, tx, cancel, metrics)
1997 .await;
1998
1999 assert!(result.is_ok());
2000 let snapshot = state.snapshot().await;
2001 assert_eq!(snapshot.last_indexed, 200);
2002 }
2003
2004 #[tokio::test]
2005 async fn test_backfill_with_tip_advancing() {
2006 let tip = Arc::new(MockTipSource::new(200));
2008 let checkpoint = Arc::new(MockCheckpointStore::new(100));
2009 let mut mock_backfill = MockBackfillSource::new();
2010 mock_backfill.add_range(101, 200, create_test_updates(101, 100));
2011 let backfill = Arc::new(mock_backfill);
2012 let live = Arc::new(MockLiveSource::new());
2013 let metrics = Arc::new(MetricsCollection::default());
2014
2015 let datasource = SyncingDatasource::new(
2016 tip,
2017 checkpoint,
2018 backfill,
2019 live,
2020 vec![UpdateType::Transaction],
2021 SyncConfig {
2022 batch_size: 50,
2023 ..Default::default()
2024 },
2025 );
2026
2027 let state = Arc::new(SyncState::new(100, 200));
2028 let (tx, _rx) = mpsc::channel(1000);
2029 let cancel = CancellationToken::new();
2030
2031 let result = datasource
2033 .backfill_if_needed(&state, 200, tx, cancel, metrics)
2034 .await;
2035
2036 assert!(result.is_ok());
2037 let snapshot = state.snapshot().await;
2038 assert_eq!(snapshot.last_indexed, 200);
2039 }
2040}