blob_indexer/indexer/
mod.rs1use alloy::primitives::B256;
2use tokio::sync::mpsc::{self};
3use tracing::{error, info, info_span};
4
5use crate::{
6 clients::beacon::types::{BlockHeader, BlockId, BlockIdResolution},
7 context::{CommonContext, Context},
8 indexer::{
9 tasks::{
10 indexing::{IndexingTask, RunParams as IndexingTaskRunParams},
11 sse_indexing::{RunParams as SSEIndexingTaskRunParams, SSEIndexingTask},
12 },
13 types::{
14 ErrorResport, IndexingTaskJoinHandle, TaskErrorChannelReceiver, TaskErrorChannelSender,
15 },
16 },
17 synchronizer::{CheckpointType, CommonSynchronizer, SynchronizerBuilder},
18};
19
20use self::error::IndexerError;
21
22pub mod error;
23pub mod tasks;
24pub mod types;
25
26pub struct Indexer {
27 context: Box<dyn CommonContext>,
28 disable_backfill: bool,
29
30 error_report_tx: TaskErrorChannelSender,
31 error_report_rx: TaskErrorChannelReceiver,
32}
33
34pub type IndexerResult<T> = Result<T, IndexerError>;
35
36impl Indexer {
37 pub fn new(context: Context, disable_backfill: bool) -> Self {
38 let (error_report_tx, error_report_rx) = mpsc::channel::<ErrorResport>(32);
39
40 Self {
41 context: Box::new(context),
42 disable_backfill,
43 error_report_rx,
44 error_report_tx,
45 }
46 }
47
48 pub async fn index_from(&mut self, from_block_id: BlockId) -> IndexerResult<()> {
49 let slot = from_block_id
50 .resolve_to_slot(self.context.beacon_client())
51 .await?;
52
53 self.start_sse_listening_task(SSEIndexingTaskRunParams {
54 last_synced_block: None,
55 last_synced_slot: Some(slot),
56 });
57
58 Ok(())
59 }
60
61 pub async fn index_block_range(
62 &mut self,
63 from_block_id: BlockId,
64 to_block_id: BlockId,
65 ) -> IndexerResult<()> {
66 let mut builder = SynchronizerBuilder::new();
67
68 builder.with_checkpoint(None);
69
70 let mut synchronizer = builder.build(self.context.clone());
71
72 synchronizer.sync_blocks(from_block_id, to_block_id).await?;
73
74 Ok(())
75 }
76 pub async fn index(&mut self) -> IndexerResult<()> {
77 let sync_state = match self.context.blobscan_client().get_sync_state().await {
78 Ok(state) => state,
79 Err(error) => {
80 error!(?error, "Failed to fetch blobscan's sync state");
81
82 return Err(IndexerError::IndexerStateRetrievalError(error));
83 }
84 };
85 let lowest_synced_slot = sync_state
86 .as_ref()
87 .and_then(|state| state.last_lower_synced_slot);
88 let last_synced_block = sync_state.as_ref().and_then(|state| {
89 match (
90 state.last_upper_synced_block_root,
91 state.last_upper_synced_block_slot,
92 ) {
93 (Some(root), Some(slot)) => Some(BlockHeader {
94 parent_root: B256::ZERO,
95 root,
96 slot,
97 }),
98 _ => None,
99 }
100 });
101 let last_synced_slot = sync_state
102 .as_ref()
103 .and_then(|state| state.last_upper_synced_slot);
104
105 info!(
106 lowest_synced_slot = ?lowest_synced_slot,
107 last_synced_block_slot = ?last_synced_block.as_ref().map(|block| block.slot),
108 last_synced_block_root = ?last_synced_block.as_ref().map(|block| block.root),
109 "Starting indexer…",
110 );
111
112 let dencun_fork_slot = self.context.network().dencun_fork_slot;
113 let backfill_completed = lowest_synced_slot.is_some_and(|slot| slot <= dencun_fork_slot);
114
115 if !self.disable_backfill && !backfill_completed {
116 let task = IndexingTask::new(
117 "backfill",
118 self.context.clone(),
119 Some(info_span!("backfill")),
120 );
121
122 let current_lowest_block_id = match lowest_synced_slot {
123 Some(lowest_synced_slot) => lowest_synced_slot.saturating_sub(1).into(),
124 None => match last_synced_slot {
125 Some(last_synced_slot) => last_synced_slot.saturating_sub(1).into(),
126 None => BlockId::Head,
127 },
128 };
129
130 task.run(IndexingTaskRunParams {
131 error_report_tx: self.error_report_tx.clone(),
132 result_report_tx: None,
133 from_block_id: current_lowest_block_id,
134 to_block_id: dencun_fork_slot.into(),
135 prev_block: None,
136 checkpoint: Some(CheckpointType::Lower),
137 });
138 }
139
140 self.start_sse_listening_task(SSEIndexingTaskRunParams {
141 last_synced_block,
142 last_synced_slot,
143 });
144
145 if let Some(error_report) = self.error_report_rx.recv().await {
146 return Err(IndexerError::IndexingTaskError {
147 task_name: error_report.task_name,
148 error: error_report.error,
149 });
150 }
151
152 Ok(())
153 }
154
155 fn start_sse_listening_task(&self, params: SSEIndexingTaskRunParams) -> IndexingTaskJoinHandle {
156 let task = SSEIndexingTask::new(self.context.clone(), self.error_report_tx.clone());
157
158 task.run(params)
159 }
160}