atlas_arch/
sync.rs

1use std::collections::VecDeque;
2use std::sync::Arc;
3
4use async_trait::async_trait;
5use tokio::sync::{mpsc, RwLock};
6use tokio_util::sync::CancellationToken;
7
8use crate::datasource::{Datasource, DatasourceId, UpdateType, Updates};
9use crate::error::IndexerResult;
10use crate::metrics::MetricsCollection;
11
12// SyncMode no longer needed; the datasource manages modes internally
13
14/// Represents the current phase of synchronization
15#[derive(Debug, Clone, Copy, PartialEq, Eq)]
16enum SyncPhase {
17    /// Backfilling toward a target tip
18    Backfilling { target_tip: u64 },
19    /// Live passthrough mode - processing live updates directly
20    LivePassthrough,
21    /// Reconnecting and catching up after a disconnection
22    Reconnecting { target_tip: u64 },
23}
24
25/// Internal state structure for synchronization
26#[derive(Debug, Clone)]
27struct SyncStateInner {
28    phase: SyncPhase,
29    last_indexed: u64,
30    cutoff_height: u64,
31    safe_tip: u64,
32    pass_through_enabled: bool,
33}
34
35/// Shared synchronization state accessible across tasks
36pub struct SyncState {
37    inner: Arc<RwLock<SyncStateInner>>,
38}
39
40impl SyncState {
41    /// Create a new SyncState with initial values
42    fn new(last_indexed: u64, safe_tip: u64) -> Self {
43        Self {
44            inner: Arc::new(RwLock::new(SyncStateInner {
45                phase: SyncPhase::Backfilling {
46                    target_tip: safe_tip,
47                },
48                last_indexed,
49                cutoff_height: 0,
50                safe_tip,
51                pass_through_enabled: false,
52            })),
53        }
54    }
55
56    /// Get a snapshot of the current state (read-only)
57    async fn snapshot(&self) -> SyncStateInner {
58        self.inner.read().await.clone()
59    }
60
61    /// Update state with a closure
62    async fn update<F>(&self, f: F)
63    where
64        F: FnOnce(&mut SyncStateInner),
65    {
66        let mut guard = self.inner.write().await;
67        f(&mut guard);
68    }
69
70    /// Set the last indexed height and update cutoff if needed
71    async fn set_last_indexed(&self, height: u64, update_cutoff: bool) {
72        self.update(|state| {
73            state.last_indexed = height;
74            if update_cutoff {
75                state.cutoff_height = height;
76            }
77        })
78        .await;
79    }
80
81    /// Set the cutoff height
82    async fn set_cutoff_height(&self, height: u64) {
83        self.update(|state| {
84            state.cutoff_height = height;
85        })
86        .await;
87    }
88
89    /// Enable or disable passthrough
90    async fn set_pass_through(&self, enabled: bool) {
91        self.update(|state| {
92            state.pass_through_enabled = enabled;
93            if enabled {
94                state.phase = SyncPhase::LivePassthrough;
95            }
96        })
97        .await;
98    }
99
100    /// Set the sync phase
101    async fn set_phase(&self, phase: SyncPhase) {
102        self.update(|state| {
103            state.phase = phase;
104        })
105        .await;
106    }
107
108    /// Update safe tip
109    async fn set_safe_tip(&self, tip: u64) {
110        self.update(|state| {
111            state.safe_tip = tip;
112        })
113        .await;
114    }
115}
116
117#[derive(Debug, Clone)]
118pub struct SyncConfig {
119    pub batch_size: u64,
120    /// Max number of live updates to buffer while backfilling
121    pub max_live_buffer: usize,
122    /// Start live stream when we are within this many blocks from tip
123    pub live_start_distance: u64,
124    /// Log progress (with ETA) every N backfill batches
125    pub progress_log_every_batches: u64,
126}
127
128impl Default for SyncConfig {
129    fn default() -> Self {
130        Self {
131            batch_size: 1_000,
132            max_live_buffer: 10_000,
133            live_start_distance: 128,
134            progress_log_every_batches: 10,
135        }
136    }
137}
138
139/// Provides the current best chain height for synchronization decisions.
140///
141/// Implementors should return the node's best known height quickly and may
142/// cache internally to avoid excessive I/O.
143#[async_trait]
144pub trait TipSource: Send + Sync {
145    /// Returns the current best height (tip) of the canonical chain.
146    async fn best_height(&self) -> IndexerResult<u64>;
147}
148
149/// Persists and retrieves the last fully indexed height.
150///
151/// Implementations must be durable and safe for concurrent callers.
152#[async_trait]
153pub trait CheckpointStore: Send + Sync {
154    /// Read the last successfully indexed height.
155    async fn last_indexed_height(&self) -> IndexerResult<u64>;
156    /// Persist the last successfully indexed height atomically.
157    async fn set_last_indexed_height(&self, height: u64) -> IndexerResult<()>;
158}
159
160/// Produces historical updates in height ranges for backfilling.
161///
162/// Implementations should chunk internally as needed and respect
163/// `cancellation_token` promptly. All emitted updates must have heights within
164/// the requested inclusive range.
165#[async_trait]
166pub trait BackfillSource: Send + Sync {
167    /// Backfill updates from `start_height_inclusive` to `end_height_inclusive`.
168    async fn backfill_range(
169        &self,
170        start_height_inclusive: u64,
171        end_height_inclusive: u64,
172        sender: tokio::sync::mpsc::Sender<(Updates, DatasourceId)>,
173        cancellation_token: CancellationToken,
174        metrics: Arc<MetricsCollection>,
175    ) -> IndexerResult<()>;
176}
177
178/// Streams live updates as the chain advances.
179///
180/// Implementations should begin from the provided height (exclusive) and emit
181/// only newer updates, resuming seamlessly across restarts if possible.
182#[async_trait]
183pub trait LiveSource: Send + Sync {
184    /// Start live streaming from the specified height (exclusive).
185    async fn consume_live(
186        &self,
187        start_from_height_exclusive: u64,
188        sender: tokio::sync::mpsc::Sender<(Updates, DatasourceId)>,
189        cancellation_token: CancellationToken,
190        metrics: Arc<MetricsCollection>,
191        reconnection_notifier: tokio::sync::mpsc::Sender<()>,
192    ) -> IndexerResult<()>;
193
194    /// Advertise live update kinds this source will emit.
195    fn update_types(&self) -> Vec<UpdateType>;
196}
197
198#[derive(Clone)]
199pub struct SyncingDatasource {
200    tip: Arc<dyn TipSource>,
201    checkpoint: Arc<dyn CheckpointStore>,
202    backfill: Arc<dyn BackfillSource>,
203    live: Arc<dyn LiveSource>,
204    update_types: Vec<UpdateType>,
205    config: SyncConfig,
206}
207
208impl SyncingDatasource {
209    async fn catch_up_to_current_tip(
210        &self,
211        state: &SyncState,
212        sender: tokio::sync::mpsc::Sender<(Updates, DatasourceId)>,
213        cancellation_token: CancellationToken,
214        metrics: Arc<MetricsCollection>,
215    ) -> IndexerResult<()> {
216        // After finishing backfill to the initial snapshot tip, the chain may have advanced.
217        // Perform a final catch-up range to the latest best height to close any gap before live.
218        let snapshot = state.snapshot().await;
219        let last_indexed = snapshot.last_indexed;
220        let best_now = self.tip.best_height().await?;
221
222        if best_now > last_indexed {
223            let start = last_indexed.saturating_add(1);
224            let end = best_now;
225            log::info!(
226                "sync: catch-up backfill from {} to current best {} before enabling live",
227                start,
228                end
229            );
230
231            let (bf_tx, mut bf_rx) = mpsc::channel::<(Updates, DatasourceId)>(1_000);
232            let forward_sender_clone = sender.clone();
233            let forward_handle = tokio::spawn(async move {
234                while let Some((u, ds)) = bf_rx.recv().await {
235                    let _ = forward_sender_clone.send((u, ds)).await;
236                }
237            });
238
239            self.backfill
240                .backfill_range(
241                    start,
242                    end,
243                    bf_tx,
244                    cancellation_token.clone(),
245                    metrics.clone(),
246                )
247                .await?;
248            let _ = forward_handle.await;
249
250            self.checkpoint.set_last_indexed_height(end).await?;
251            state.set_last_indexed(end, true).await;
252            metrics
253                .update_gauge("sync_last_indexed", end as f64)
254                .await?;
255        }
256
257        Ok(())
258    }
259    pub fn new(
260        tip: Arc<dyn TipSource>,
261        checkpoint: Arc<dyn CheckpointStore>,
262        backfill: Arc<dyn BackfillSource>,
263        live: Arc<dyn LiveSource>,
264        update_types: Vec<UpdateType>,
265        config: SyncConfig,
266    ) -> Self {
267        Self {
268            tip,
269            checkpoint,
270            backfill,
271            live,
272            update_types,
273            config,
274        }
275    }
276}
277
278impl SyncingDatasource {
279    async fn initialize_state_and_metrics(
280        &self,
281        metrics: Arc<MetricsCollection>,
282    ) -> IndexerResult<(u64, u64, u64)> {
283        let last_indexed = self.checkpoint.last_indexed_height().await?;
284        let best_height = self.tip.best_height().await?;
285        let safe_tip = best_height;
286
287        metrics
288            .update_gauge("sync_best_height", best_height as f64)
289            .await?;
290        metrics
291            .update_gauge("sync_safe_tip", safe_tip as f64)
292            .await?;
293        metrics
294            .update_gauge("sync_last_indexed", last_indexed as f64)
295            .await?;
296
297        Ok((last_indexed, best_height, safe_tip))
298    }
299
300    fn spawn_live_forwarder(
301        &self,
302        live_rx: tokio::sync::mpsc::Receiver<(Updates, DatasourceId)>,
303        state: Arc<SyncState>,
304        forward_sender: tokio::sync::mpsc::Sender<(Updates, DatasourceId)>,
305        checkpoint: Arc<dyn CheckpointStore>,
306        max_buffer: usize,
307        cancellation_token: CancellationToken,
308        metrics: Arc<MetricsCollection>,
309    ) {
310        let live_token_forwarder = cancellation_token;
311        let live_metrics_forwarder = metrics;
312
313        tokio::spawn(async move {
314            // Initialize last persisted height from checkpoint
315            let mut last_persisted_height: u64 = match checkpoint.last_indexed_height().await {
316                Ok(h) => h,
317                Err(_) => 0,
318            };
319            let mut buffer: VecDeque<(Updates, DatasourceId)> = VecDeque::new();
320            let mut flushed_after_enable = false;
321            let mut live_rx = live_rx;
322            loop {
323                tokio::select! {
324                    _ = live_token_forwarder.cancelled() => {
325                        break;
326                    }
327                    maybe_msg = live_rx.recv() => {
328                        match maybe_msg {
329                            Some((updates, datasource_id)) => {
330                                let snapshot = state.snapshot().await;
331                                if snapshot.pass_through_enabled {
332                                    if !flushed_after_enable {
333                                        // Flush buffered updates using current cutoff
334                                        while let Some((mut u, ds_id)) = buffer.pop_front() {
335                                            if let Some(filtered) = filter_by_cutoff(&mut u, snapshot.cutoff_height) {
336                                                if let Some(max_h) = max_height_in_updates(&filtered) {
337                                                    if max_h > last_persisted_height {
338                                                        let _ = checkpoint.set_last_indexed_height(max_h).await;
339                                                        let _ = live_metrics_forwarder.update_gauge("sync_last_indexed", max_h as f64).await;
340                                                        last_persisted_height = max_h;
341                                                        // Update state with new last_indexed
342                                                        state.set_last_indexed(max_h, false).await;
343                                                    }
344                                                }
345                                                let _ = forward_sender.send((filtered, ds_id)).await;
346                                            }
347                                            let _ = live_metrics_forwarder.increment_counter("sync_live_buffer_flushed", 1).await;
348                                        }
349                                        flushed_after_enable = true;
350                                    }
351                                    // Process current update with latest cutoff
352                                    let current_snapshot = state.snapshot().await;
353                                    let mut u = updates;
354                                    if let Some(filtered) = filter_by_cutoff(&mut u, current_snapshot.cutoff_height) {
355                                        if let Some(max_h) = max_height_in_updates(&filtered) {
356                                            if max_h > last_persisted_height {
357                                                let _ = checkpoint.set_last_indexed_height(max_h).await;
358                                                let _ = live_metrics_forwarder.update_gauge("sync_last_indexed", max_h as f64).await;
359                                                last_persisted_height = max_h;
360                                                // Update state with new last_indexed
361                                                state.set_last_indexed(max_h, false).await;
362                                            }
363                                        }
364                                        let _ = forward_sender.send((filtered, datasource_id)).await;
365                                    }
366                                } else {
367                                    if buffer.len() >= max_buffer {
368                                        buffer.pop_front();
369                                        let _ = live_metrics_forwarder.increment_counter("sync_live_buffer_overflow", 1).await;
370                                    }
371                                    buffer.push_back((updates, datasource_id));
372                                    let _ = live_metrics_forwarder.increment_counter("sync_live_buffered", 1).await;
373                                }
374                            }
375                            None => break,
376                        }
377                    }
378                }
379            }
380        });
381    }
382
383    fn spawn_live_stream(
384        &self,
385        state: Arc<SyncState>,
386        live_tx: tokio::sync::mpsc::Sender<(Updates, DatasourceId)>,
387        cancellation_token: CancellationToken,
388        metrics: Arc<MetricsCollection>,
389        reconnection_notifier: tokio::sync::mpsc::Sender<()>,
390    ) {
391        let live_source = Arc::clone(&self.live);
392        tokio::spawn(async move {
393            // Get initial start height from state
394            let snapshot = state.snapshot().await;
395            let start_from_height_exclusive = snapshot.last_indexed;
396            let _ = live_source
397                .consume_live(
398                    start_from_height_exclusive,
399                    live_tx,
400                    cancellation_token,
401                    metrics,
402                    reconnection_notifier,
403                )
404                .await;
405        });
406    }
407
408    fn spawn_reconciliation_listener(
409        &self,
410        state: Arc<SyncState>,
411        sender: mpsc::Sender<(Updates, DatasourceId)>,
412        cancellation_token: CancellationToken,
413        metrics: Arc<MetricsCollection>,
414        reconnect_rx: mpsc::Receiver<()>,
415    ) {
416        let datasource_for_reconcile = self.clone();
417        let state_reconcile = Arc::clone(&state);
418        let sender_reconcile = sender;
419        let cancel_reconcile = cancellation_token;
420        let metrics_reconcile = metrics;
421        let mut reconnect_rx = reconnect_rx;
422
423        tokio::spawn(async move {
424            loop {
425                tokio::select! {
426                    _ = cancel_reconcile.cancelled() => {
427                        break;
428                    }
429                    maybe_notification = reconnect_rx.recv() => {
430                        match maybe_notification {
431                            Some(()) => {
432                                if let Err(e) = datasource_for_reconcile.reconcile_if_behind(
433                                    &state_reconcile,
434                                    sender_reconcile.clone(),
435                                    cancel_reconcile.clone(),
436                                    metrics_reconcile.clone(),
437                                ).await {
438                                    log::error!("sync: reconciliation error: {}", e);
439                                }
440                            }
441                            None => break,
442                        }
443                    }
444                }
445            }
446        });
447    }
448
449    async fn backfill_if_needed(
450        &self,
451        state: &SyncState,
452        final_target_tip: u64,
453        sender: tokio::sync::mpsc::Sender<(Updates, DatasourceId)>,
454        cancellation_token: CancellationToken,
455        metrics: Arc<MetricsCollection>,
456    ) -> IndexerResult<()> {
457        let snapshot = state.snapshot().await;
458        let last_indexed = snapshot.last_indexed;
459
460        if last_indexed < final_target_tip {
461            log::info!(
462                "sync: entering backfill mode from {} toward safe tip (best={}, initial_safe_tip={})",
463                last_indexed + 1,
464                final_target_tip,
465                final_target_tip
466            );
467
468            state
469                .set_phase(SyncPhase::Backfilling {
470                    target_tip: final_target_tip,
471                })
472                .await;
473            let mut current = last_indexed.saturating_add(1);
474            let mut batches_processed: u64 = 0;
475            let start_time = std::time::Instant::now();
476            loop {
477                if cancellation_token.is_cancelled() {
478                    log::info!("sync: cancellation received during backfill");
479                    return Ok(());
480                }
481
482                // Update metrics with current best, but do not move the backfill target beyond final_target_tip
483                let best_height_now = self.tip.best_height().await?;
484                metrics
485                    .update_gauge("sync_best_height", best_height_now as f64)
486                    .await?;
487                metrics
488                    .update_gauge("sync_safe_tip", final_target_tip as f64)
489                    .await?;
490                state.set_safe_tip(final_target_tip).await;
491
492                if current > final_target_tip {
493                    break;
494                }
495
496                let end = std::cmp::min(
497                    final_target_tip,
498                    current.saturating_add(self.config.batch_size.saturating_sub(1)),
499                );
500
501                metrics
502                    .increment_counter("sync_backfill_batches", 1)
503                    .await?;
504                batches_processed = batches_processed.saturating_add(1);
505
506                let (bf_tx, mut bf_rx) = mpsc::channel::<(Updates, DatasourceId)>(1_000);
507                let forward_sender_clone = sender.clone();
508                let forward_handle = tokio::spawn(async move {
509                    while let Some((u, ds)) = bf_rx.recv().await {
510                        let _ = forward_sender_clone.send((u, ds)).await;
511                    }
512                });
513
514                self.backfill
515                    .backfill_range(
516                        current,
517                        end,
518                        bf_tx,
519                        cancellation_token.clone(),
520                        metrics.clone(),
521                    )
522                    .await?;
523                let _ = forward_handle.await;
524
525                self.checkpoint.set_last_indexed_height(end).await?;
526                state.set_last_indexed(end, true).await;
527                metrics
528                    .update_gauge("sync_last_indexed", end as f64)
529                    .await?;
530
531                current = end.saturating_add(1);
532
533                // Periodic progress log with ETA
534                if batches_processed % self.config.progress_log_every_batches == 0 {
535                    let snapshot = state.snapshot().await;
536                    let elapsed = start_time.elapsed().as_secs_f64();
537                    let heights_done = (snapshot
538                        .last_indexed
539                        .saturating_sub(current.saturating_sub(1)))
540                    .max(1);
541                    let total_remaining = final_target_tip.saturating_sub(snapshot.last_indexed);
542                    let rate_hps = (heights_done as f64 / elapsed.max(0.001)).max(0.000_001);
543                    let eta_secs = (total_remaining as f64 / rate_hps).round() as u64;
544                    log::info!(
545                        "sync: backfill progress heights=[{}..{}], remaining={}, rate={:.0} h/s, eta={}s",
546                        current.saturating_sub(self.config.batch_size.saturating_sub(1)),
547                        snapshot.last_indexed,
548                        total_remaining,
549                        rate_hps,
550                        eta_secs
551                    );
552                }
553
554                // Live stream is started at the beginning of sync to buffer new events.
555                // We no longer defer starting live based on distance to tip here.
556            }
557        } else {
558            log::info!(
559                "sync: already at or past safe tip (last_indexed={}, safe_tip={}), entering live",
560                last_indexed,
561                final_target_tip
562            );
563        }
564
565        Ok(())
566    }
567
568    async fn enable_live_passthrough(
569        &self,
570        state: &SyncState,
571        metrics: Arc<MetricsCollection>,
572    ) -> IndexerResult<()> {
573        // Set cutoff to the last fully backfilled height and enable pass-through
574        let snapshot = state.snapshot().await;
575        state.set_cutoff_height(snapshot.last_indexed).await;
576        state.set_pass_through(true).await;
577        metrics
578            .update_gauge("sync_live_passthrough_enabled", 1.0)
579            .await?;
580        log::info!("sync: live pass-through enabled; backfill complete");
581        Ok(())
582    }
583
584    async fn reconcile_if_behind(
585        &self,
586        state: &SyncState,
587        sender: mpsc::Sender<(Updates, DatasourceId)>,
588        cancellation_token: CancellationToken,
589        metrics: Arc<MetricsCollection>,
590    ) -> IndexerResult<()> {
591        let snapshot = state.snapshot().await;
592        let last = snapshot.last_indexed;
593        let best = self.tip.best_height().await?;
594
595        if best > last {
596            log::info!(
597                "sync: reconciliation triggered after reconnection (last={}, best={})",
598                last,
599                best
600            );
601
602            // Disable passthrough to buffer live updates
603            state.set_pass_through(false).await;
604            state
605                .set_phase(SyncPhase::Reconnecting { target_tip: best })
606                .await;
607            metrics
608                .update_gauge("sync_live_passthrough_enabled", 0.0)
609                .await?;
610
611            // Set cutoff to current last_indexed
612            state.set_cutoff_height(last).await;
613
614            // Backfill the gap
615            self.backfill_if_needed(
616                state,
617                best,
618                sender.clone(),
619                cancellation_token.clone(),
620                metrics.clone(),
621            )
622            .await?;
623
624            // Catch up to current tip
625            self.catch_up_to_current_tip(
626                state,
627                sender.clone(),
628                cancellation_token.clone(),
629                metrics.clone(),
630            )
631            .await?;
632
633            // Re-enable passthrough
634            self.enable_live_passthrough(state, metrics.clone()).await?;
635
636            let final_snapshot = state.snapshot().await;
637            log::info!(
638                "sync: reconciliation complete (last_indexed={})",
639                final_snapshot.last_indexed
640            );
641        }
642
643        Ok(())
644    }
645}
646
647#[async_trait]
648impl Datasource for SyncingDatasource {
649    async fn consume(
650        &self,
651        _id: DatasourceId,
652        sender: mpsc::Sender<(Updates, DatasourceId)>,
653        cancellation_token: CancellationToken,
654        metrics: Arc<MetricsCollection>,
655    ) -> IndexerResult<()> {
656        let (last_indexed, _best_height, safe_tip) =
657            self.initialize_state_and_metrics(metrics.clone()).await?;
658
659        // Create shared state
660        let state = Arc::new(SyncState::new(last_indexed, safe_tip));
661
662        // Prepare live channel and forwarder (buffer until pass-through)
663        let (live_tx, live_rx) = mpsc::channel::<(Updates, DatasourceId)>(1_000);
664        self.spawn_live_forwarder(
665            live_rx,
666            Arc::clone(&state),
667            sender.clone(),
668            Arc::clone(&self.checkpoint),
669            self.config.max_live_buffer,
670            cancellation_token.clone(),
671            metrics.clone(),
672        );
673
674        // Create reconnection notification channel
675        let (reconnect_tx, reconnect_rx) = mpsc::channel::<()>(10);
676
677        // Spawn reconciliation listener task
678        self.spawn_reconciliation_listener(
679            Arc::clone(&state),
680            sender.clone(),
681            cancellation_token.clone(),
682            metrics.clone(),
683            reconnect_rx,
684        );
685
686        // Start live immediately to buffer new events while we backfill.
687        self.spawn_live_stream(
688            Arc::clone(&state),
689            live_tx.clone(),
690            cancellation_token.clone(),
691            metrics.clone(),
692            reconnect_tx,
693        );
694
695        self.backfill_if_needed(
696            &state,
697            safe_tip,
698            sender.clone(),
699            cancellation_token.clone(),
700            metrics.clone(),
701        )
702        .await?;
703
704        // Before enabling passthrough, close any gap that formed while syncing to the initial snapshot tip.
705        self.catch_up_to_current_tip(
706            &state,
707            sender.clone(),
708            cancellation_token.clone(),
709            metrics.clone(),
710        )
711        .await?;
712
713        self.enable_live_passthrough(&state, metrics.clone())
714            .await?;
715
716        // Live already started at the beginning; no action needed here.
717
718        cancellation_token.cancelled().await;
719        Ok(())
720    }
721
722    fn update_types(&self) -> Vec<UpdateType> {
723        self.update_types.clone()
724    }
725}
726
727// Height-based filtering without signature dedup: we rely on the height present in updates.
728fn filter_by_cutoff(updates: &mut Updates, cutoff_height: u64) -> Option<Updates> {
729    match updates {
730        Updates::Transactions(txs) => {
731            let mut filtered = Vec::new();
732            for t in txs.iter() {
733                if t.height > cutoff_height {
734                    filtered.push(t.clone());
735                }
736            }
737            if filtered.is_empty() {
738                None
739            } else {
740                Some(Updates::Transactions(filtered))
741            }
742        }
743        Updates::BlockDetails(blocks) => {
744            let filtered: Vec<crate::datasource::BlockDetails> = blocks
745                .iter()
746                .cloned()
747                .filter(|b| b.height > cutoff_height)
748                .collect();
749            if filtered.is_empty() {
750                None
751            } else {
752                Some(Updates::BlockDetails(filtered))
753            }
754        }
755        Updates::Accounts(accounts) => {
756            let filtered: Vec<crate::datasource::AccountUpdate> = accounts
757                .iter()
758                .cloned()
759                .filter(|a| a.height > cutoff_height)
760                .collect();
761            if filtered.is_empty() {
762                None
763            } else {
764                Some(Updates::Accounts(filtered))
765            }
766        }
767        Updates::AccountDeletions(deletions) => {
768            let filtered: Vec<crate::datasource::AccountDeletion> = deletions
769                .iter()
770                .cloned()
771                .filter(|d| d.height > cutoff_height)
772                .collect();
773            if filtered.is_empty() {
774                None
775            } else {
776                Some(Updates::AccountDeletions(filtered))
777            }
778        }
779        Updates::BitcoinBlocks(blocks) => {
780            let filtered: Vec<crate::datasource::BitcoinBlock> = blocks
781                .iter()
782                .cloned()
783                .filter(|b| b.block_height > cutoff_height)
784                .collect();
785            if filtered.is_empty() {
786                None
787            } else {
788                Some(Updates::BitcoinBlocks(filtered))
789            }
790        }
791        Updates::RolledbackTransactions(events) => {
792            Some(Updates::RolledbackTransactions(events.clone()))
793        }
794        Updates::ReappliedTransactions(events) => {
795            Some(Updates::ReappliedTransactions(events.clone()))
796        }
797    }
798}
799
800// Determine the maximum height carried by an `Updates` payload for checkpointing
801fn max_height_in_updates(updates: &Updates) -> Option<u64> {
802    match updates {
803        Updates::Transactions(txs) => txs.iter().map(|t| t.height).max(),
804        Updates::BlockDetails(blocks) => blocks.iter().map(|b| b.height).max(),
805        Updates::Accounts(accts) => accts.iter().map(|a| a.height).max(),
806        Updates::AccountDeletions(dels) => dels.iter().map(|d| d.height).max(),
807        Updates::BitcoinBlocks(blocks) => blocks.iter().map(|b| b.block_height).max(),
808        Updates::RolledbackTransactions(_) => None,
809        Updates::ReappliedTransactions(_) => None,
810    }
811}
812
813#[cfg(test)]
814mod tests {
815    use std::sync::atomic::{AtomicU64, Ordering};
816    use std::sync::Arc;
817    use tokio::sync::mpsc;
818    use tokio_util::sync::CancellationToken;
819
820    use super::*;
821    use crate::datasource::{
822        AccountDeletion, AccountUpdate, BitcoinBlock, BlockDetails, TransactionUpdate, UpdateType,
823    };
824    use crate::error::IndexerResult;
825    use crate::metrics::MetricsCollection;
826
827    // Mock implementations for testing
828
829    struct MockTipSource {
830        height: Arc<AtomicU64>,
831    }
832
833    impl MockTipSource {
834        fn new(height: u64) -> Self {
835            Self {
836                height: Arc::new(AtomicU64::new(height)),
837            }
838        }
839
840        fn set_height(&self, height: u64) {
841            self.height.store(height, Ordering::SeqCst);
842        }
843    }
844
845    #[async_trait]
846    impl TipSource for MockTipSource {
847        async fn best_height(&self) -> IndexerResult<u64> {
848            Ok(self.height.load(Ordering::SeqCst))
849        }
850    }
851
852    struct MockCheckpointStore {
853        last_indexed: Arc<AtomicU64>,
854    }
855
856    impl MockCheckpointStore {
857        fn new(last_indexed: u64) -> Self {
858            Self {
859                last_indexed: Arc::new(AtomicU64::new(last_indexed)),
860            }
861        }
862
863        fn get_last_indexed(&self) -> u64 {
864            self.last_indexed.load(Ordering::SeqCst)
865        }
866    }
867
868    #[async_trait]
869    impl CheckpointStore for MockCheckpointStore {
870        async fn last_indexed_height(&self) -> IndexerResult<u64> {
871            Ok(self.last_indexed.load(Ordering::SeqCst))
872        }
873
874        async fn set_last_indexed_height(&self, height: u64) -> IndexerResult<()> {
875            self.last_indexed.store(height, Ordering::SeqCst);
876            Ok(())
877        }
878    }
879
880    struct MockBackfillSource {
881        updates: Vec<(u64, u64, Updates)>, // (start, end, updates)
882    }
883
884    impl MockBackfillSource {
885        fn new() -> Self {
886            Self {
887                updates: Vec::new(),
888            }
889        }
890
891        fn add_range(&mut self, start: u64, end: u64, updates: Updates) {
892            self.updates.push((start, end, updates));
893        }
894    }
895
896    #[async_trait]
897    impl BackfillSource for MockBackfillSource {
898        async fn backfill_range(
899            &self,
900            start_height_inclusive: u64,
901            end_height_inclusive: u64,
902            sender: mpsc::Sender<(Updates, DatasourceId)>,
903            _cancellation_token: CancellationToken,
904            _metrics: Arc<MetricsCollection>,
905        ) -> IndexerResult<()> {
906            for (start, end, updates) in &self.updates {
907                if *start == start_height_inclusive && *end == end_height_inclusive {
908                    let _ = sender
909                        .send((updates.clone(), DatasourceId::new_unique()))
910                        .await;
911                    return Ok(());
912                }
913            }
914            Ok(())
915        }
916    }
917
918    struct MockLiveSource {
919        updates: Vec<Updates>,
920        should_reconnect: bool,
921    }
922
923    impl MockLiveSource {
924        fn new() -> Self {
925            Self {
926                updates: Vec::new(),
927                should_reconnect: false,
928            }
929        }
930
931        fn add_update(&mut self, update: Updates) {
932            self.updates.push(update);
933        }
934
935        fn set_should_reconnect(&mut self, should: bool) {
936            self.should_reconnect = should;
937        }
938    }
939
940    #[async_trait]
941    impl LiveSource for MockLiveSource {
942        async fn consume_live(
943            &self,
944            _start_from_height_exclusive: u64,
945            sender: mpsc::Sender<(Updates, DatasourceId)>,
946            cancellation_token: CancellationToken,
947            _metrics: Arc<MetricsCollection>,
948            reconnection_notifier: mpsc::Sender<()>,
949        ) -> IndexerResult<()> {
950            let updates = self.updates.clone();
951            let should_reconnect = self.should_reconnect;
952            tokio::spawn(async move {
953                for update in updates {
954                    if cancellation_token.is_cancelled() {
955                        break;
956                    }
957                    let _ = sender.send((update, DatasourceId::new_unique())).await;
958                    tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
959                }
960                if should_reconnect {
961                    let _ = reconnection_notifier.send(()).await;
962                }
963            });
964            Ok(())
965        }
966
967        fn update_types(&self) -> Vec<UpdateType> {
968            vec![UpdateType::Transaction]
969        }
970    }
971
972    fn create_test_updates(start_height: u64, count: u64) -> Updates {
973        use arch_program::hash::Hash;
974        use arch_program::sanitized::{ArchMessage, MessageHeader};
975        use arch_sdk::{RollbackStatus, RuntimeTransaction, Status};
976
977        let mut txs = Vec::new();
978        for i in 0..count {
979            txs.push(TransactionUpdate {
980                transaction: arch_sdk::ProcessedTransaction {
981                    runtime_transaction: RuntimeTransaction {
982                        version: 1,
983                        signatures: vec![],
984                        message: ArchMessage {
985                            header: MessageHeader {
986                                num_readonly_signed_accounts: 0,
987                                num_readonly_unsigned_accounts: 0,
988                                num_required_signatures: 0,
989                            },
990                            account_keys: vec![],
991                            instructions: vec![],
992                            recent_blockhash: Hash::default(),
993                        },
994                    },
995                    inner_instructions_list: vec![],
996                    status: Status::Processed,
997                    bitcoin_txid: None,
998                    logs: vec![],
999                    rollback_status: RollbackStatus::NotRolledback,
1000                },
1001                height: start_height + i,
1002            });
1003        }
1004        Updates::Transactions(txs)
1005    }
1006
1007    fn create_test_block_details(start_height: u64, count: u64) -> Updates {
1008        let mut blocks = Vec::new();
1009        for i in 0..count {
1010            blocks.push(BlockDetails {
1011                height: start_height + i,
1012                block_hash: None,
1013                previous_block_hash: None,
1014                block_time: None,
1015                block_height: None,
1016            });
1017        }
1018        Updates::BlockDetails(blocks)
1019    }
1020
1021    fn create_test_accounts(start_height: u64, count: u64) -> Updates {
1022        let mut accounts = Vec::new();
1023        for i in 0..count {
1024            accounts.push(AccountUpdate {
1025                pubkey: arch_program::pubkey::Pubkey::default(),
1026                account: arch_sdk::AccountInfo {
1027                    lamports: 0,
1028                    owner: arch_program::pubkey::Pubkey::default(),
1029                    data: vec![],
1030                    utxo: String::new(),
1031                    is_executable: false,
1032                },
1033                height: start_height + i,
1034            });
1035        }
1036        Updates::Accounts(accounts)
1037    }
1038
1039    fn create_test_bitcoin_blocks(start_height: u64, count: u64) -> Updates {
1040        let mut blocks = Vec::new();
1041        for i in 0..count {
1042            blocks.push(BitcoinBlock {
1043                block_height: start_height + i,
1044                block_hash: format!("hash_{}", start_height + i),
1045            });
1046        }
1047        Updates::BitcoinBlocks(blocks)
1048    }
1049
1050    // Test SyncState
1051
1052    #[tokio::test]
1053    async fn test_sync_state_initialization() {
1054        let state = SyncState::new(100, 200);
1055        let snapshot = state.snapshot().await;
1056
1057        assert_eq!(snapshot.last_indexed, 100);
1058        assert_eq!(snapshot.safe_tip, 200);
1059        assert_eq!(snapshot.cutoff_height, 0);
1060        assert!(!snapshot.pass_through_enabled);
1061        match snapshot.phase {
1062            SyncPhase::Backfilling { target_tip } => assert_eq!(target_tip, 200),
1063            _ => panic!("Expected Backfilling phase"),
1064        }
1065    }
1066
1067    #[tokio::test]
1068    async fn test_sync_state_set_last_indexed() {
1069        let state = SyncState::new(100, 200);
1070
1071        state.set_last_indexed(150, true).await;
1072        let snapshot = state.snapshot().await;
1073        assert_eq!(snapshot.last_indexed, 150);
1074        assert_eq!(snapshot.cutoff_height, 150);
1075
1076        state.set_last_indexed(175, false).await;
1077        let snapshot = state.snapshot().await;
1078        assert_eq!(snapshot.last_indexed, 175);
1079        assert_eq!(snapshot.cutoff_height, 150); // Should not update
1080    }
1081
1082    #[tokio::test]
1083    async fn test_sync_state_set_pass_through() {
1084        let state = SyncState::new(100, 200);
1085
1086        state.set_pass_through(true).await;
1087        let snapshot = state.snapshot().await;
1088        assert!(snapshot.pass_through_enabled);
1089        match snapshot.phase {
1090            SyncPhase::LivePassthrough => {}
1091            _ => panic!("Expected LivePassthrough phase"),
1092        }
1093
1094        state.set_pass_through(false).await;
1095        let snapshot = state.snapshot().await;
1096        assert!(!snapshot.pass_through_enabled);
1097    }
1098
1099    #[tokio::test]
1100    async fn test_sync_state_set_phase() {
1101        let state = SyncState::new(100, 200);
1102
1103        state
1104            .set_phase(SyncPhase::Reconnecting { target_tip: 250 })
1105            .await;
1106        let snapshot = state.snapshot().await;
1107        match snapshot.phase {
1108            SyncPhase::Reconnecting { target_tip } => assert_eq!(target_tip, 250),
1109            _ => panic!("Expected Reconnecting phase"),
1110        }
1111    }
1112
1113    #[tokio::test]
1114    async fn test_sync_state_set_safe_tip() {
1115        let state = SyncState::new(100, 200);
1116
1117        state.set_safe_tip(300).await;
1118        let snapshot = state.snapshot().await;
1119        assert_eq!(snapshot.safe_tip, 300);
1120    }
1121
1122    // Test filter_by_cutoff
1123
1124    #[test]
1125    fn test_filter_by_cutoff_transactions() {
1126        let mut updates = create_test_updates(100, 10);
1127        let filtered = filter_by_cutoff(&mut updates, 105);
1128
1129        assert!(filtered.is_some());
1130        if let Some(Updates::Transactions(txs)) = filtered {
1131            assert_eq!(txs.len(), 4); // Heights 106, 107, 108, 109
1132            assert_eq!(txs[0].height, 106);
1133            assert_eq!(txs[3].height, 109);
1134        } else {
1135            panic!("Expected Transactions variant");
1136        }
1137    }
1138
1139    #[test]
1140    fn test_filter_by_cutoff_transactions_all_below() {
1141        let mut updates = create_test_updates(100, 10);
1142        let filtered = filter_by_cutoff(&mut updates, 150);
1143
1144        assert!(filtered.is_none());
1145    }
1146
1147    #[test]
1148    fn test_filter_by_cutoff_transactions_all_above() {
1149        let mut updates = create_test_updates(100, 10);
1150        let filtered = filter_by_cutoff(&mut updates, 50);
1151
1152        assert!(filtered.is_some());
1153        if let Some(Updates::Transactions(txs)) = filtered {
1154            assert_eq!(txs.len(), 10);
1155        } else {
1156            panic!("Expected Transactions variant");
1157        }
1158    }
1159
1160    #[test]
1161    fn test_filter_by_cutoff_block_details() {
1162        let mut updates = create_test_block_details(100, 10);
1163        let filtered = filter_by_cutoff(&mut updates, 105);
1164
1165        assert!(filtered.is_some());
1166        if let Some(Updates::BlockDetails(blocks)) = filtered {
1167            assert_eq!(blocks.len(), 4);
1168            assert_eq!(blocks[0].height, 106);
1169        } else {
1170            panic!("Expected BlockDetails variant");
1171        }
1172    }
1173
1174    #[test]
1175    fn test_filter_by_cutoff_accounts() {
1176        let mut updates = create_test_accounts(100, 10);
1177        let filtered = filter_by_cutoff(&mut updates, 105);
1178
1179        assert!(filtered.is_some());
1180        if let Some(Updates::Accounts(accounts)) = filtered {
1181            assert_eq!(accounts.len(), 4);
1182            assert_eq!(accounts[0].height, 106);
1183        } else {
1184            panic!("Expected Accounts variant");
1185        }
1186    }
1187
1188    #[test]
1189    fn test_filter_by_cutoff_bitcoin_blocks() {
1190        let mut updates = create_test_bitcoin_blocks(100, 10);
1191        let filtered = filter_by_cutoff(&mut updates, 105);
1192
1193        assert!(filtered.is_some());
1194        if let Some(Updates::BitcoinBlocks(blocks)) = filtered {
1195            assert_eq!(blocks.len(), 4);
1196            assert_eq!(blocks[0].block_height, 106);
1197        } else {
1198            panic!("Expected BitcoinBlocks variant");
1199        }
1200    }
1201
1202    #[test]
1203    fn test_filter_by_cutoff_account_deletions() {
1204        let mut deletions = Vec::new();
1205        for i in 0..10 {
1206            deletions.push(AccountDeletion {
1207                pubkey: arch_program::pubkey::Pubkey::default(),
1208                height: 100 + i,
1209            });
1210        }
1211        let mut updates = Updates::AccountDeletions(deletions);
1212        let filtered = filter_by_cutoff(&mut updates, 105);
1213
1214        assert!(filtered.is_some());
1215        if let Some(Updates::AccountDeletions(dels)) = filtered {
1216            assert_eq!(dels.len(), 4);
1217            assert_eq!(dels[0].height, 106);
1218        } else {
1219            panic!("Expected AccountDeletions variant");
1220        }
1221    }
1222
1223    #[test]
1224    fn test_filter_by_cutoff_rollbacked_transactions() {
1225        let events = vec![crate::datasource::RolledbackTransactionsEvent {
1226            height: 100,
1227            transaction_hashes: vec!["hash1".to_string()],
1228        }];
1229        let mut updates = Updates::RolledbackTransactions(events.clone());
1230        let filtered = filter_by_cutoff(&mut updates, 150);
1231
1232        // RolledbackTransactions should always pass through
1233        assert!(filtered.is_some());
1234        if let Some(Updates::RolledbackTransactions(filtered_events)) = filtered {
1235            assert_eq!(filtered_events.len(), 1);
1236        } else {
1237            panic!("Expected RolledbackTransactions variant");
1238        }
1239    }
1240
1241    // Test max_height_in_updates
1242
1243    #[test]
1244    fn test_max_height_in_updates_transactions() {
1245        let updates = create_test_updates(100, 10);
1246        let max_height = max_height_in_updates(&updates);
1247        assert_eq!(max_height, Some(109));
1248    }
1249
1250    #[test]
1251    fn test_max_height_in_updates_block_details() {
1252        let updates = create_test_block_details(50, 5);
1253        let max_height = max_height_in_updates(&updates);
1254        assert_eq!(max_height, Some(54));
1255    }
1256
1257    #[test]
1258    fn test_max_height_in_updates_accounts() {
1259        let updates = create_test_accounts(200, 3);
1260        let max_height = max_height_in_updates(&updates);
1261        assert_eq!(max_height, Some(202));
1262    }
1263
1264    #[test]
1265    fn test_max_height_in_updates_bitcoin_blocks() {
1266        let updates = create_test_bitcoin_blocks(300, 4);
1267        let max_height = max_height_in_updates(&updates);
1268        assert_eq!(max_height, Some(303));
1269    }
1270
1271    #[test]
1272    fn test_max_height_in_updates_empty() {
1273        let updates = Updates::Transactions(Vec::new());
1274        let max_height = max_height_in_updates(&updates);
1275        assert_eq!(max_height, None);
1276    }
1277
1278    #[test]
1279    fn test_max_height_in_updates_rollbacked() {
1280        let events = vec![crate::datasource::RolledbackTransactionsEvent {
1281            height: 100,
1282            transaction_hashes: vec!["hash1".to_string()],
1283        }];
1284        let updates = Updates::RolledbackTransactions(events);
1285        let max_height = max_height_in_updates(&updates);
1286        assert_eq!(max_height, None);
1287    }
1288
1289    // Test SyncingDatasource methods
1290
1291    #[tokio::test]
1292    async fn test_initialize_state_and_metrics() {
1293        let tip = Arc::new(MockTipSource::new(500));
1294        let checkpoint = Arc::new(MockCheckpointStore::new(100));
1295        let backfill = Arc::new(MockBackfillSource::new());
1296        let live = Arc::new(MockLiveSource::new());
1297        let metrics = Arc::new(MetricsCollection::default());
1298
1299        let datasource = SyncingDatasource::new(
1300            tip,
1301            checkpoint,
1302            backfill,
1303            live,
1304            vec![UpdateType::Transaction],
1305            SyncConfig::default(),
1306        );
1307
1308        let (last_indexed, best_height, safe_tip) = datasource
1309            .initialize_state_and_metrics(metrics)
1310            .await
1311            .unwrap();
1312
1313        assert_eq!(last_indexed, 100);
1314        assert_eq!(best_height, 500);
1315        assert_eq!(safe_tip, 500);
1316    }
1317
1318    #[tokio::test]
1319    async fn test_backfill_if_needed_no_backfill() {
1320        let tip = Arc::new(MockTipSource::new(500));
1321        let checkpoint = Arc::new(MockCheckpointStore::new(500));
1322        let backfill = Arc::new(MockBackfillSource::new());
1323        let live = Arc::new(MockLiveSource::new());
1324        let metrics = Arc::new(MetricsCollection::default());
1325
1326        let datasource = SyncingDatasource::new(
1327            tip,
1328            checkpoint,
1329            backfill,
1330            live,
1331            vec![UpdateType::Transaction],
1332            SyncConfig::default(),
1333        );
1334
1335        let state = Arc::new(SyncState::new(500, 500));
1336        let (tx, _rx) = mpsc::channel(1000);
1337        let cancel = CancellationToken::new();
1338
1339        let result = datasource
1340            .backfill_if_needed(&state, 500, tx, cancel, metrics)
1341            .await;
1342
1343        assert!(result.is_ok());
1344        let snapshot = state.snapshot().await;
1345        assert_eq!(snapshot.last_indexed, 500);
1346    }
1347
1348    #[tokio::test]
1349    async fn test_backfill_if_needed_with_backfill() {
1350        let tip = Arc::new(MockTipSource::new(500));
1351        let checkpoint = Arc::new(MockCheckpointStore::new(100));
1352        let mut mock_backfill = MockBackfillSource::new();
1353        mock_backfill.add_range(101, 200, create_test_updates(101, 100));
1354        let backfill = Arc::new(mock_backfill);
1355        let live = Arc::new(MockLiveSource::new());
1356        let metrics = Arc::new(MetricsCollection::default());
1357
1358        let datasource = SyncingDatasource::new(
1359            tip,
1360            checkpoint,
1361            backfill,
1362            live,
1363            vec![UpdateType::Transaction],
1364            SyncConfig {
1365                batch_size: 100,
1366                ..Default::default()
1367            },
1368        );
1369
1370        let state = Arc::new(SyncState::new(100, 200));
1371        let (tx, mut rx) = mpsc::channel(1000);
1372        let cancel = CancellationToken::new();
1373
1374        let result = datasource
1375            .backfill_if_needed(&state, 200, tx.clone(), cancel, metrics)
1376            .await;
1377
1378        assert!(result.is_ok());
1379        let snapshot = state.snapshot().await;
1380        assert_eq!(snapshot.last_indexed, 200);
1381
1382        // Verify updates were forwarded
1383        let mut received_count = 0;
1384        while let Ok(Some(_)) =
1385            tokio::time::timeout(tokio::time::Duration::from_millis(100), rx.recv()).await
1386        {
1387            received_count += 1;
1388        }
1389        assert!(received_count > 0);
1390    }
1391
1392    #[tokio::test]
1393    async fn test_enable_live_passthrough() {
1394        let tip = Arc::new(MockTipSource::new(500));
1395        let checkpoint = Arc::new(MockCheckpointStore::new(200));
1396        let backfill = Arc::new(MockBackfillSource::new());
1397        let live = Arc::new(MockLiveSource::new());
1398        let metrics = Arc::new(MetricsCollection::default());
1399
1400        let datasource = SyncingDatasource::new(
1401            tip,
1402            checkpoint,
1403            backfill,
1404            live,
1405            vec![UpdateType::Transaction],
1406            SyncConfig::default(),
1407        );
1408
1409        let state = Arc::new(SyncState::new(200, 200));
1410        state.set_last_indexed(200, true).await;
1411
1412        let result = datasource.enable_live_passthrough(&state, metrics).await;
1413        assert!(result.is_ok());
1414
1415        let snapshot = state.snapshot().await;
1416        assert!(snapshot.pass_through_enabled);
1417        assert_eq!(snapshot.cutoff_height, 200);
1418        match snapshot.phase {
1419            SyncPhase::LivePassthrough => {}
1420            _ => panic!("Expected LivePassthrough phase"),
1421        }
1422    }
1423
1424    #[tokio::test]
1425    async fn test_catch_up_to_current_tip() {
1426        let tip = Arc::new(MockTipSource::new(250));
1427        let checkpoint = Arc::new(MockCheckpointStore::new(200));
1428        let checkpoint_clone = Arc::clone(&checkpoint);
1429        let mut mock_backfill = MockBackfillSource::new();
1430        mock_backfill.add_range(201, 250, create_test_updates(201, 50));
1431        let backfill = Arc::new(mock_backfill);
1432        let live = Arc::new(MockLiveSource::new());
1433        let metrics = Arc::new(MetricsCollection::default());
1434
1435        let datasource = SyncingDatasource::new(
1436            tip,
1437            checkpoint,
1438            backfill,
1439            live,
1440            vec![UpdateType::Transaction],
1441            SyncConfig::default(),
1442        );
1443
1444        let state = Arc::new(SyncState::new(200, 200));
1445        let (tx, mut rx) = mpsc::channel(1000);
1446        let cancel = CancellationToken::new();
1447
1448        let result = datasource
1449            .catch_up_to_current_tip(&state, tx.clone(), cancel, metrics)
1450            .await;
1451
1452        assert!(result.is_ok());
1453        let snapshot = state.snapshot().await;
1454        assert_eq!(snapshot.last_indexed, 250);
1455        // Check checkpoint separately to avoid borrow checker issues
1456        let checkpoint_height = checkpoint_clone.get_last_indexed();
1457        assert_eq!(checkpoint_height, 250);
1458
1459        // Verify updates were forwarded
1460        let mut received = false;
1461        while let Ok(Some(_)) =
1462            tokio::time::timeout(tokio::time::Duration::from_millis(100), rx.recv()).await
1463        {
1464            received = true;
1465        }
1466        assert!(received);
1467    }
1468
1469    #[tokio::test]
1470    async fn test_catch_up_to_current_tip_no_gap() {
1471        let tip = Arc::new(MockTipSource::new(200));
1472        let checkpoint = Arc::new(MockCheckpointStore::new(200));
1473        let backfill = Arc::new(MockBackfillSource::new());
1474        let live = Arc::new(MockLiveSource::new());
1475        let metrics = Arc::new(MetricsCollection::default());
1476
1477        let datasource = SyncingDatasource::new(
1478            tip,
1479            checkpoint,
1480            backfill,
1481            live,
1482            vec![UpdateType::Transaction],
1483            SyncConfig::default(),
1484        );
1485
1486        let state = Arc::new(SyncState::new(200, 200));
1487        let (tx, _rx) = mpsc::channel(1000);
1488        let cancel = CancellationToken::new();
1489
1490        let result = datasource
1491            .catch_up_to_current_tip(&state, tx, cancel, metrics)
1492            .await;
1493
1494        assert!(result.is_ok());
1495        let snapshot = state.snapshot().await;
1496        assert_eq!(snapshot.last_indexed, 200);
1497    }
1498
1499    #[tokio::test]
1500    async fn test_reconcile_if_behind() {
1501        let tip = Arc::new(MockTipSource::new(300));
1502        let checkpoint = Arc::new(MockCheckpointStore::new(200));
1503        let mut mock_backfill = MockBackfillSource::new();
1504        mock_backfill.add_range(201, 300, create_test_updates(201, 100));
1505        let backfill = Arc::new(mock_backfill);
1506        let live = Arc::new(MockLiveSource::new());
1507        let metrics = Arc::new(MetricsCollection::default());
1508
1509        let datasource = SyncingDatasource::new(
1510            tip,
1511            checkpoint,
1512            backfill,
1513            live,
1514            vec![UpdateType::Transaction],
1515            SyncConfig {
1516                batch_size: 100,
1517                ..Default::default()
1518            },
1519        );
1520
1521        let state = Arc::new(SyncState::new(200, 200));
1522        state.set_pass_through(true).await; // Start in passthrough mode
1523        let (tx, _rx) = mpsc::channel(1000);
1524        let cancel = CancellationToken::new();
1525
1526        let result = datasource
1527            .reconcile_if_behind(&state, tx, cancel, metrics)
1528            .await;
1529
1530        assert!(result.is_ok());
1531        let snapshot = state.snapshot().await;
1532        assert_eq!(snapshot.last_indexed, 300);
1533        assert!(snapshot.pass_through_enabled); // Should be re-enabled after reconciliation
1534    }
1535
1536    #[tokio::test]
1537    async fn test_reconcile_if_behind_no_gap() {
1538        let tip = Arc::new(MockTipSource::new(200));
1539        let checkpoint = Arc::new(MockCheckpointStore::new(200));
1540        let backfill = Arc::new(MockBackfillSource::new());
1541        let live = Arc::new(MockLiveSource::new());
1542        let metrics = Arc::new(MetricsCollection::default());
1543
1544        let datasource = SyncingDatasource::new(
1545            tip,
1546            checkpoint,
1547            backfill,
1548            live,
1549            vec![UpdateType::Transaction],
1550            SyncConfig::default(),
1551        );
1552
1553        let state = Arc::new(SyncState::new(200, 200));
1554        let (tx, _rx) = mpsc::channel(1000);
1555        let cancel = CancellationToken::new();
1556
1557        let result = datasource
1558            .reconcile_if_behind(&state, tx, cancel, metrics)
1559            .await;
1560
1561        assert!(result.is_ok());
1562        let snapshot = state.snapshot().await;
1563        assert_eq!(snapshot.last_indexed, 200);
1564    }
1565
1566    #[tokio::test]
1567    async fn test_backfill_cancellation() {
1568        let tip = Arc::new(MockTipSource::new(500));
1569        let checkpoint = Arc::new(MockCheckpointStore::new(100));
1570        let backfill = Arc::new(MockBackfillSource::new());
1571        let live = Arc::new(MockLiveSource::new());
1572        let metrics = Arc::new(MetricsCollection::default());
1573
1574        let datasource = SyncingDatasource::new(
1575            tip,
1576            checkpoint,
1577            backfill,
1578            live,
1579            vec![UpdateType::Transaction],
1580            SyncConfig::default(),
1581        );
1582
1583        let state = Arc::new(SyncState::new(100, 500));
1584        let (tx, _rx) = mpsc::channel(1000);
1585        let cancel = CancellationToken::new();
1586        cancel.cancel(); // Cancel immediately
1587
1588        let result = datasource
1589            .backfill_if_needed(&state, 500, tx, cancel, metrics)
1590            .await;
1591
1592        assert!(result.is_ok()); // Should handle cancellation gracefully
1593    }
1594
1595    #[tokio::test]
1596    async fn test_backfill_batch_processing() {
1597        let tip = Arc::new(MockTipSource::new(500));
1598        let checkpoint = Arc::new(MockCheckpointStore::new(100));
1599        let mut mock_backfill = MockBackfillSource::new();
1600        // Add multiple batches
1601        mock_backfill.add_range(101, 200, create_test_updates(101, 100));
1602        mock_backfill.add_range(201, 300, create_test_updates(201, 100));
1603        mock_backfill.add_range(301, 400, create_test_updates(301, 100));
1604        let backfill = Arc::new(mock_backfill);
1605        let live = Arc::new(MockLiveSource::new());
1606        let metrics = Arc::new(MetricsCollection::default());
1607
1608        let datasource = SyncingDatasource::new(
1609            tip,
1610            checkpoint,
1611            backfill,
1612            live,
1613            vec![UpdateType::Transaction],
1614            SyncConfig {
1615                batch_size: 100,
1616                ..Default::default()
1617            },
1618        );
1619
1620        let state = Arc::new(SyncState::new(100, 400));
1621        let (tx, _rx) = mpsc::channel(1000);
1622        let cancel = CancellationToken::new();
1623
1624        let result = datasource
1625            .backfill_if_needed(&state, 400, tx, cancel, metrics)
1626            .await;
1627
1628        assert!(result.is_ok());
1629        let snapshot = state.snapshot().await;
1630        assert_eq!(snapshot.last_indexed, 400);
1631    }
1632
1633    // Additional edge case tests
1634
1635    #[test]
1636    fn test_filter_by_cutoff_exactly_at_cutoff() {
1637        // Updates exactly at cutoff should be filtered out (height > cutoff)
1638        // Heights 100-104 are <= 104, so all should be filtered out
1639        let mut updates = create_test_updates(100, 5);
1640        let filtered = filter_by_cutoff(&mut updates, 104);
1641
1642        // All updates filtered out, so should return None
1643        assert!(filtered.is_none());
1644    }
1645
1646    #[test]
1647    fn test_filter_by_cutoff_empty_updates() {
1648        let mut updates = Updates::Transactions(Vec::new());
1649        let filtered = filter_by_cutoff(&mut updates, 100);
1650        assert!(filtered.is_none());
1651    }
1652
1653    #[test]
1654    fn test_filter_by_cutoff_mixed_heights() {
1655        use arch_program::hash::Hash;
1656        use arch_program::sanitized::{ArchMessage, MessageHeader};
1657        use arch_sdk::{RollbackStatus, RuntimeTransaction, Status};
1658
1659        let mut txs = Vec::new();
1660        // Create transactions with mixed heights: some below, some at, some above cutoff
1661        for height in [50, 100, 101, 150, 200].iter() {
1662            txs.push(TransactionUpdate {
1663                transaction: arch_sdk::ProcessedTransaction {
1664                    runtime_transaction: RuntimeTransaction {
1665                        version: 1,
1666                        signatures: vec![],
1667                        message: ArchMessage {
1668                            header: MessageHeader {
1669                                num_readonly_signed_accounts: 0,
1670                                num_readonly_unsigned_accounts: 0,
1671                                num_required_signatures: 0,
1672                            },
1673                            account_keys: vec![],
1674                            instructions: vec![],
1675                            recent_blockhash: Hash::default(),
1676                        },
1677                    },
1678                    inner_instructions_list: vec![],
1679                    status: Status::Processed,
1680                    bitcoin_txid: None,
1681                    logs: vec![],
1682                    rollback_status: RollbackStatus::NotRolledback,
1683                },
1684                height: *height,
1685            });
1686        }
1687        let mut updates = Updates::Transactions(txs);
1688        let filtered = filter_by_cutoff(&mut updates, 100);
1689
1690        assert!(filtered.is_some());
1691        if let Some(Updates::Transactions(filtered_txs)) = filtered {
1692            // Only heights > 100 should pass: 101, 150, 200
1693            assert_eq!(filtered_txs.len(), 3);
1694            assert_eq!(filtered_txs[0].height, 101);
1695            assert_eq!(filtered_txs[1].height, 150);
1696            assert_eq!(filtered_txs[2].height, 200);
1697        } else {
1698            panic!("Expected Transactions variant");
1699        }
1700    }
1701
1702    #[test]
1703    fn test_filter_by_cutoff_reapplied_transactions() {
1704        let events = vec![crate::datasource::ReappliedTransactionsEvent {
1705            height: 100,
1706            transaction_hashes: vec!["hash1".to_string(), "hash2".to_string()],
1707        }];
1708        let mut updates = Updates::ReappliedTransactions(events.clone());
1709        let filtered = filter_by_cutoff(&mut updates, 150);
1710
1711        // ReappliedTransactions should always pass through
1712        assert!(filtered.is_some());
1713        if let Some(Updates::ReappliedTransactions(filtered_events)) = filtered {
1714            assert_eq!(filtered_events.len(), 1);
1715            assert_eq!(filtered_events[0].transaction_hashes.len(), 2);
1716        } else {
1717            panic!("Expected ReappliedTransactions variant");
1718        }
1719    }
1720
1721    #[test]
1722    fn test_max_height_in_updates_account_deletions() {
1723        let mut deletions = Vec::new();
1724        for i in 0..5 {
1725            deletions.push(AccountDeletion {
1726                pubkey: arch_program::pubkey::Pubkey::default(),
1727                height: 100 + i,
1728            });
1729        }
1730        let updates = Updates::AccountDeletions(deletions);
1731        let max_height = max_height_in_updates(&updates);
1732        assert_eq!(max_height, Some(104));
1733    }
1734
1735    #[tokio::test]
1736    async fn test_backfill_single_block() {
1737        let tip = Arc::new(MockTipSource::new(101));
1738        let checkpoint = Arc::new(MockCheckpointStore::new(100));
1739        let mut mock_backfill = MockBackfillSource::new();
1740        mock_backfill.add_range(101, 101, create_test_updates(101, 1));
1741        let backfill = Arc::new(mock_backfill);
1742        let live = Arc::new(MockLiveSource::new());
1743        let metrics = Arc::new(MetricsCollection::default());
1744
1745        let datasource = SyncingDatasource::new(
1746            tip,
1747            checkpoint,
1748            backfill,
1749            live,
1750            vec![UpdateType::Transaction],
1751            SyncConfig {
1752                batch_size: 1000,
1753                ..Default::default()
1754            },
1755        );
1756
1757        let state = Arc::new(SyncState::new(100, 101));
1758        let (tx, mut rx) = mpsc::channel(1000);
1759        let cancel = CancellationToken::new();
1760
1761        let result = datasource
1762            .backfill_if_needed(&state, 101, tx.clone(), cancel, metrics)
1763            .await;
1764
1765        assert!(result.is_ok());
1766        let snapshot = state.snapshot().await;
1767        assert_eq!(snapshot.last_indexed, 101);
1768
1769        // Verify update was forwarded
1770        let received = rx.recv().await;
1771        assert!(received.is_some());
1772    }
1773
1774    #[tokio::test]
1775    async fn test_backfill_already_at_tip() {
1776        let tip = Arc::new(MockTipSource::new(100));
1777        let checkpoint = Arc::new(MockCheckpointStore::new(100));
1778        let backfill = Arc::new(MockBackfillSource::new());
1779        let live = Arc::new(MockLiveSource::new());
1780        let metrics = Arc::new(MetricsCollection::default());
1781
1782        let datasource = SyncingDatasource::new(
1783            tip,
1784            checkpoint,
1785            backfill,
1786            live,
1787            vec![UpdateType::Transaction],
1788            SyncConfig::default(),
1789        );
1790
1791        let state = Arc::new(SyncState::new(100, 100));
1792        let (tx, _rx) = mpsc::channel(1000);
1793        let cancel = CancellationToken::new();
1794
1795        let result = datasource
1796            .backfill_if_needed(&state, 100, tx, cancel, metrics)
1797            .await;
1798
1799        assert!(result.is_ok());
1800        let snapshot = state.snapshot().await;
1801        assert_eq!(snapshot.last_indexed, 100);
1802    }
1803
1804    #[tokio::test]
1805    async fn test_backfill_past_tip() {
1806        let tip = Arc::new(MockTipSource::new(100));
1807        let checkpoint = Arc::new(MockCheckpointStore::new(150));
1808        let backfill = Arc::new(MockBackfillSource::new());
1809        let live = Arc::new(MockLiveSource::new());
1810        let metrics = Arc::new(MetricsCollection::default());
1811
1812        let datasource = SyncingDatasource::new(
1813            tip,
1814            checkpoint,
1815            backfill,
1816            live,
1817            vec![UpdateType::Transaction],
1818            SyncConfig::default(),
1819        );
1820
1821        let state = Arc::new(SyncState::new(150, 100));
1822        let (tx, _rx) = mpsc::channel(1000);
1823        let cancel = CancellationToken::new();
1824
1825        let result = datasource
1826            .backfill_if_needed(&state, 100, tx, cancel, metrics)
1827            .await;
1828
1829        assert!(result.is_ok());
1830        let snapshot = state.snapshot().await;
1831        assert_eq!(snapshot.last_indexed, 150); // Should remain unchanged
1832    }
1833
1834    #[tokio::test]
1835    async fn test_backfill_small_batch_size() {
1836        let tip = Arc::new(MockTipSource::new(105));
1837        let checkpoint = Arc::new(MockCheckpointStore::new(100));
1838        let mut mock_backfill = MockBackfillSource::new();
1839        mock_backfill.add_range(101, 101, create_test_updates(101, 1));
1840        mock_backfill.add_range(102, 102, create_test_updates(102, 1));
1841        mock_backfill.add_range(103, 103, create_test_updates(103, 1));
1842        mock_backfill.add_range(104, 104, create_test_updates(104, 1));
1843        mock_backfill.add_range(105, 105, create_test_updates(105, 1));
1844        let backfill = Arc::new(mock_backfill);
1845        let live = Arc::new(MockLiveSource::new());
1846        let metrics = Arc::new(MetricsCollection::default());
1847
1848        let datasource = SyncingDatasource::new(
1849            tip,
1850            checkpoint,
1851            backfill,
1852            live,
1853            vec![UpdateType::Transaction],
1854            SyncConfig {
1855                batch_size: 1,
1856                ..Default::default()
1857            },
1858        );
1859
1860        let state = Arc::new(SyncState::new(100, 105));
1861        let (tx, mut rx) = mpsc::channel(1000);
1862        let cancel = CancellationToken::new();
1863
1864        let result = datasource
1865            .backfill_if_needed(&state, 105, tx.clone(), cancel, metrics)
1866            .await;
1867
1868        assert!(result.is_ok());
1869        let snapshot = state.snapshot().await;
1870        assert_eq!(snapshot.last_indexed, 105);
1871
1872        // Verify all updates were forwarded
1873        let mut received_count = 0;
1874        while let Ok(Some(_)) =
1875            tokio::time::timeout(tokio::time::Duration::from_millis(100), rx.recv()).await
1876        {
1877            received_count += 1;
1878        }
1879        assert_eq!(received_count, 5);
1880    }
1881
1882    #[tokio::test]
1883    async fn test_sync_state_concurrent_updates() {
1884        let state = Arc::new(SyncState::new(100, 200));
1885
1886        // Simulate concurrent updates
1887        let state1 = Arc::clone(&state);
1888        let state2 = Arc::clone(&state);
1889        let state3 = Arc::clone(&state);
1890
1891        tokio::join!(
1892            async move { state1.set_last_indexed(150, false).await },
1893            async move { state2.set_safe_tip(250).await },
1894            async move { state3.set_pass_through(true).await },
1895        );
1896
1897        let snapshot = state.snapshot().await;
1898        assert_eq!(snapshot.last_indexed, 150);
1899        assert_eq!(snapshot.safe_tip, 250);
1900        assert!(snapshot.pass_through_enabled);
1901    }
1902
1903    #[tokio::test]
1904    async fn test_reconcile_state_transitions() {
1905        let tip = Arc::new(MockTipSource::new(300));
1906        let checkpoint = Arc::new(MockCheckpointStore::new(200));
1907        let mut mock_backfill = MockBackfillSource::new();
1908        mock_backfill.add_range(201, 300, create_test_updates(201, 100));
1909        let backfill = Arc::new(mock_backfill);
1910        let live = Arc::new(MockLiveSource::new());
1911        let metrics = Arc::new(MetricsCollection::default());
1912
1913        let datasource = SyncingDatasource::new(
1914            tip,
1915            checkpoint,
1916            backfill,
1917            live,
1918            vec![UpdateType::Transaction],
1919            SyncConfig {
1920                batch_size: 100,
1921                ..Default::default()
1922            },
1923        );
1924
1925        let state = Arc::new(SyncState::new(200, 200));
1926        state.set_pass_through(true).await;
1927
1928        // Verify initial state
1929        let snapshot = state.snapshot().await;
1930        assert!(snapshot.pass_through_enabled);
1931        match snapshot.phase {
1932            SyncPhase::LivePassthrough => {}
1933            _ => panic!("Expected LivePassthrough phase"),
1934        }
1935
1936        let (tx, _rx) = mpsc::channel(1000);
1937        let cancel = CancellationToken::new();
1938
1939        let result = datasource
1940            .reconcile_if_behind(&state, tx, cancel, metrics)
1941            .await;
1942
1943        assert!(result.is_ok());
1944
1945        // Verify final state after reconciliation
1946        let snapshot = state.snapshot().await;
1947        assert_eq!(snapshot.last_indexed, 300);
1948        assert!(snapshot.pass_through_enabled); // Should be re-enabled
1949        match snapshot.phase {
1950            SyncPhase::LivePassthrough => {}
1951            _ => panic!("Expected LivePassthrough phase after reconciliation"),
1952        }
1953    }
1954
1955    #[test]
1956    fn test_filter_by_cutoff_boundary_conditions() {
1957        // Test with cutoff = 0 (should filter nothing if heights are > 0)
1958        let mut updates = create_test_updates(1, 5);
1959        let filtered = filter_by_cutoff(&mut updates, 0);
1960        assert!(filtered.is_some());
1961        if let Some(Updates::Transactions(txs)) = filtered {
1962            assert_eq!(txs.len(), 5);
1963        } else {
1964            panic!("Expected Transactions variant");
1965        }
1966
1967        // Test with very large cutoff
1968        let mut updates = create_test_updates(100, 5);
1969        let filtered = filter_by_cutoff(&mut updates, u64::MAX);
1970        assert!(filtered.is_none());
1971    }
1972
1973    #[tokio::test]
1974    async fn test_catch_up_with_empty_backfill() {
1975        // When tip hasn't advanced, catch_up should do nothing
1976        let tip = Arc::new(MockTipSource::new(200));
1977        let checkpoint = Arc::new(MockCheckpointStore::new(200));
1978        let backfill = Arc::new(MockBackfillSource::new());
1979        let live = Arc::new(MockLiveSource::new());
1980        let metrics = Arc::new(MetricsCollection::default());
1981
1982        let datasource = SyncingDatasource::new(
1983            tip,
1984            checkpoint,
1985            backfill,
1986            live,
1987            vec![UpdateType::Transaction],
1988            SyncConfig::default(),
1989        );
1990
1991        let state = Arc::new(SyncState::new(200, 200));
1992        let (tx, _rx) = mpsc::channel(1000);
1993        let cancel = CancellationToken::new();
1994
1995        let result = datasource
1996            .catch_up_to_current_tip(&state, tx, cancel, metrics)
1997            .await;
1998
1999        assert!(result.is_ok());
2000        let snapshot = state.snapshot().await;
2001        assert_eq!(snapshot.last_indexed, 200);
2002    }
2003
2004    #[tokio::test]
2005    async fn test_backfill_with_tip_advancing() {
2006        // Simulate tip advancing during backfill
2007        let tip = Arc::new(MockTipSource::new(200));
2008        let checkpoint = Arc::new(MockCheckpointStore::new(100));
2009        let mut mock_backfill = MockBackfillSource::new();
2010        mock_backfill.add_range(101, 200, create_test_updates(101, 100));
2011        let backfill = Arc::new(mock_backfill);
2012        let live = Arc::new(MockLiveSource::new());
2013        let metrics = Arc::new(MetricsCollection::default());
2014
2015        let datasource = SyncingDatasource::new(
2016            tip,
2017            checkpoint,
2018            backfill,
2019            live,
2020            vec![UpdateType::Transaction],
2021            SyncConfig {
2022                batch_size: 50,
2023                ..Default::default()
2024            },
2025        );
2026
2027        let state = Arc::new(SyncState::new(100, 200));
2028        let (tx, _rx) = mpsc::channel(1000);
2029        let cancel = CancellationToken::new();
2030
2031        // Backfill should stop at final_target_tip even if tip advances
2032        let result = datasource
2033            .backfill_if_needed(&state, 200, tx, cancel, metrics)
2034            .await;
2035
2036        assert!(result.is_ok());
2037        let snapshot = state.snapshot().await;
2038        assert_eq!(snapshot.last_indexed, 200);
2039    }
2040}