blob_indexer/indexer/
mod.rs

1use alloy::primitives::B256;
2use tokio::sync::mpsc::{self};
3use tracing::{error, info, info_span};
4
5use crate::{
6    clients::beacon::types::{BlockHeader, BlockId, BlockIdResolution},
7    context::{CommonContext, Context},
8    indexer::{
9        tasks::{
10            indexing::{IndexingTask, RunParams as IndexingTaskRunParams},
11            sse_indexing::{RunParams as SSEIndexingTaskRunParams, SSEIndexingTask},
12        },
13        types::{
14            ErrorResport, IndexingTaskJoinHandle, TaskErrorChannelReceiver, TaskErrorChannelSender,
15        },
16    },
17    synchronizer::{CheckpointType, CommonSynchronizer, SynchronizerBuilder},
18};
19
20use self::error::IndexerError;
21
22pub mod error;
23pub mod tasks;
24pub mod types;
25
26pub struct Indexer {
27    context: Box<dyn CommonContext>,
28    disable_backfill: bool,
29
30    error_report_tx: TaskErrorChannelSender,
31    error_report_rx: TaskErrorChannelReceiver,
32}
33
34pub type IndexerResult<T> = Result<T, IndexerError>;
35
36impl Indexer {
37    pub fn new(context: Context, disable_backfill: bool) -> Self {
38        let (error_report_tx, error_report_rx) = mpsc::channel::<ErrorResport>(32);
39
40        Self {
41            context: Box::new(context),
42            disable_backfill,
43            error_report_rx,
44            error_report_tx,
45        }
46    }
47
48    pub async fn index_from(&mut self, from_block_id: BlockId) -> IndexerResult<()> {
49        let slot = from_block_id
50            .resolve_to_slot(self.context.beacon_client())
51            .await?;
52
53        self.start_sse_listening_task(SSEIndexingTaskRunParams {
54            last_synced_block: None,
55            last_synced_slot: Some(slot),
56        });
57
58        Ok(())
59    }
60
61    pub async fn index_block_range(
62        &mut self,
63        from_block_id: BlockId,
64        to_block_id: BlockId,
65    ) -> IndexerResult<()> {
66        let mut builder = SynchronizerBuilder::new();
67
68        builder.with_checkpoint(None);
69
70        let mut synchronizer = builder.build(self.context.clone());
71
72        synchronizer.sync_blocks(from_block_id, to_block_id).await?;
73
74        Ok(())
75    }
76    pub async fn index(&mut self) -> IndexerResult<()> {
77        let sync_state = match self.context.blobscan_client().get_sync_state().await {
78            Ok(state) => state,
79            Err(error) => {
80                error!(?error, "Failed to fetch blobscan's sync state");
81
82                return Err(IndexerError::IndexerStateRetrievalError(error));
83            }
84        };
85        let lowest_synced_slot = sync_state
86            .as_ref()
87            .and_then(|state| state.last_lower_synced_slot);
88        let last_synced_block = sync_state.as_ref().and_then(|state| {
89            match (
90                state.last_upper_synced_block_root,
91                state.last_upper_synced_block_slot,
92            ) {
93                (Some(root), Some(slot)) => Some(BlockHeader {
94                    parent_root: B256::ZERO,
95                    root,
96                    slot,
97                }),
98                _ => None,
99            }
100        });
101        let last_synced_slot = sync_state
102            .as_ref()
103            .and_then(|state| state.last_upper_synced_slot);
104
105        info!(
106            lowest_synced_slot = ?lowest_synced_slot,
107            last_synced_block_slot = ?last_synced_block.as_ref().map(|block| block.slot),
108            last_synced_block_root = ?last_synced_block.as_ref().map(|block| block.root),
109            "Starting indexer…",
110        );
111
112        let dencun_fork_slot = self.context.network().dencun_fork_slot;
113        let backfill_completed = lowest_synced_slot.is_some_and(|slot| slot <= dencun_fork_slot);
114
115        if !self.disable_backfill && !backfill_completed {
116            let task = IndexingTask::new(
117                "backfill",
118                self.context.clone(),
119                Some(info_span!("backfill")),
120            );
121
122            let current_lowest_block_id = match lowest_synced_slot {
123                Some(lowest_synced_slot) => lowest_synced_slot.saturating_sub(1).into(),
124                None => match last_synced_slot {
125                    Some(last_synced_slot) => last_synced_slot.saturating_sub(1).into(),
126                    None => BlockId::Head,
127                },
128            };
129
130            task.run(IndexingTaskRunParams {
131                error_report_tx: self.error_report_tx.clone(),
132                result_report_tx: None,
133                from_block_id: current_lowest_block_id,
134                to_block_id: dencun_fork_slot.into(),
135                prev_block: None,
136                checkpoint: Some(CheckpointType::Lower),
137            });
138        }
139
140        self.start_sse_listening_task(SSEIndexingTaskRunParams {
141            last_synced_block,
142            last_synced_slot,
143        });
144
145        if let Some(error_report) = self.error_report_rx.recv().await {
146            return Err(IndexerError::IndexingTaskError {
147                task_name: error_report.task_name,
148                error: error_report.error,
149            });
150        }
151
152        Ok(())
153    }
154
155    fn start_sse_listening_task(&self, params: SSEIndexingTaskRunParams) -> IndexingTaskJoinHandle {
156        let task = SSEIndexingTask::new(self.context.clone(), self.error_report_tx.clone());
157
158        task.run(params)
159    }
160}