blob_indexer/slots_processor/
mod.rs1use alloy::{
2 consensus::Transaction,
3 eips::{eip4844::kzg_to_versioned_hash, BlockId},
4 primitives::B256,
5};
6use anyhow::{anyhow, Context as AnyhowContext, Result};
7
8use crate::{clients::beacon::types::BlockHeader, utils::alloy::BlobTransactionExt};
9use tracing::{debug, info, Instrument};
10
11use crate::{
12 clients::{
13 blobscan::types::{Blob, BlobscanBlock, Block, Transaction as BlobscanTransaction},
14 common::ClientError,
15 },
16 context::CommonContext,
17};
18
19use self::error::{SlotProcessingError, SlotsProcessorError};
20
21pub mod error;
22
23const MAX_ALLOWED_REORG_DEPTH: u32 = 100;
24
25pub struct BlockData {
26 pub root: B256,
27 pub parent_root: B256,
28 pub slot: u32,
29 pub execution_block_hash: B256,
30}
31
32impl From<&BlockData> for BlockHeader {
33 fn from(block: &BlockData) -> Self {
34 BlockHeader {
35 root: block.root,
36 parent_root: block.parent_root,
37 slot: block.slot,
38 }
39 }
40}
41
42pub struct SlotsProcessor {
43 context: Box<dyn CommonContext>,
44 pub last_processed_block: Option<BlockHeader>,
45}
46
47impl SlotsProcessor {
48 pub fn new(
49 context: Box<dyn CommonContext>,
50 last_processed_block: Option<BlockHeader>,
51 ) -> SlotsProcessor {
52 Self {
53 context,
54 last_processed_block,
55 }
56 }
57
58 pub async fn process_slots(
59 &mut self,
60 initial_slot: u32,
61 final_slot: u32,
62 ) -> Result<(), SlotsProcessorError> {
63 let is_reverse = initial_slot > final_slot;
64 let slots = if is_reverse {
65 (final_slot..initial_slot).rev().collect::<Vec<_>>()
66 } else {
67 (initial_slot..final_slot).collect::<Vec<_>>()
68 };
69
70 let mut last_processed_block = self.last_processed_block.clone();
71
72 for current_slot in slots {
73 let block_header = match self
74 .context
75 .beacon_client()
76 .get_block_header(current_slot.into())
77 .await?
78 {
79 Some(header) => header,
80 None => {
81 debug!(current_slot, "Skipping as there is no beacon block header");
82
83 continue;
84 }
85 };
86
87 if !is_reverse {
88 if let Some(prev_block_header) = last_processed_block {
89 if prev_block_header.root != B256::ZERO
90 && prev_block_header.root != block_header.parent_root
91 {
92 info!(
93 new_head_slot = block_header.slot,
94 old_head_slot = prev_block_header.slot,
95 new_head_block_root = ?block_header.root,
96 old_head_block_root = ?prev_block_header.root,
97 "Reorg detected!",
98 );
99
100 self.process_reorg(&prev_block_header, &block_header)
101 .await
102 .map_err(|error| SlotsProcessorError::FailedReorgProcessing {
103 old_slot: prev_block_header.slot,
104 new_slot: block_header.slot,
105 new_head_block_root: block_header.root,
106 old_head_block_root: prev_block_header.root,
107 error,
108 })?;
109 }
110 }
111 }
112
113 if let Err(error) = self.process_block(&block_header).await {
114 return Err(SlotsProcessorError::FailedSlotsProcessing {
115 initial_slot,
116 final_slot,
117 failed_slot: current_slot,
118 error,
119 });
120 }
121
122 last_processed_block = Some(block_header);
123 }
124
125 self.last_processed_block = last_processed_block;
126
127 Ok(())
128 }
129
130 async fn process_block(
131 &self,
132 beacon_block_header: &BlockHeader,
133 ) -> Result<(), SlotProcessingError> {
134 let beacon_client = self.context.beacon_client();
135 let blobscan_client = self.context.blobscan_client();
136 let provider = self.context.provider();
137 let beacon_block_root = beacon_block_header.root;
138 let slot = beacon_block_header.slot;
139
140 let beacon_block = match beacon_client.get_block(slot.into()).await? {
141 Some(block) => block,
142 None => {
143 debug!(slot = slot, "Skipping as there is no beacon block");
144
145 return Ok(());
146 }
147 };
148
149 let execution_payload = match beacon_block.execution_payload {
150 Some(payload) => payload,
151 None => {
152 debug!(
153 slot,
154 "Skipping as beacon block doesn't contain execution payload"
155 );
156
157 return Ok(());
158 }
159 };
160
161 let has_kzg_blob_commitments = match beacon_block.blob_kzg_commitments {
162 Some(commitments) => !commitments.is_empty(),
163 None => false,
164 };
165
166 if !has_kzg_blob_commitments {
167 debug!(
168 slot,
169 "Skipping as beacon block doesn't contain blob kzg commitments"
170 );
171
172 return Ok(());
173 }
174
175 let execution_block_hash = execution_payload.block_hash;
176
177 let execution_block = provider
180 .get_block(BlockId::Hash(execution_block_hash.into()))
181 .full()
182 .await?
183 .with_context(|| format!("Execution block {execution_block_hash} not found"))?;
184
185 let blob_txs = execution_block.transactions.filter_blob_transactions();
186
187 if blob_txs.is_empty() {
188 return Err(anyhow!("Blocks mismatch: Consensus block \"{beacon_block_root}\" contains blob KZG commitments, but the corresponding execution block \"{execution_block_hash:#?}\" does not contain any blob transactions").into());
189 }
190
191 let blobs = match beacon_client
192 .get_blobs(slot.into())
193 .await
194 .map_err(SlotProcessingError::ClientError)?
195 {
196 Some(blobs) => {
197 if blobs.is_empty() {
198 debug!(slot, "Skipping as blobs sidecar is empty");
199
200 return Ok(());
201 } else {
202 blobs
203 }
204 }
205 None => {
206 debug!(slot, "Skipping as there is no blobs sidecar");
207
208 return Ok(());
209 }
210 };
211
212 let block_entity = Block::try_from((&execution_block, slot))?;
214 let tx_entities = blob_txs
215 .iter()
216 .map(|tx| BlobscanTransaction::try_from((*tx, &execution_block)))
217 .collect::<Result<Vec<BlobscanTransaction>>>()?;
218
219 let blob_entities = blob_txs
220 .into_iter()
221 .flat_map(|tx| {
222 tx.blob_versioned_hashes()
223 .into_iter()
224 .flatten()
225 .enumerate()
226 .map( |(i, versioned_hash)| {
227 let tx_hash = tx.inner.hash();
228 let blob = blobs
229 .iter()
230 .find(|blob| {
231 let vh = kzg_to_versioned_hash(blob.kzg_commitment.as_ref());
232
233 vh.eq(versioned_hash)
234 })
235 .with_context(|| format!(
236 "Sidecar not found for blob {i:?} with versioned hash {versioned_hash:?} from tx {tx_hash:?}"
237 ))?;
238
239 Ok(Blob::from((blob, (i as u32), tx_hash)))
240 })
241 })
242 .collect::<Result<Vec<Blob>, anyhow::Error>>()?;
243
244 blobscan_client
245 .index(block_entity, tx_entities, blob_entities)
246 .await
247 .map_err(SlotProcessingError::ClientError)?;
248
249 let block_number = execution_block.header.number;
250 info!(slot, block_number, "Block indexed successfully");
251
252 Ok(())
253 }
254
255 async fn process_reorg(
257 &mut self,
258 old_head_header: &BlockHeader,
259 new_head_header: &BlockHeader,
260 ) -> Result<(), anyhow::Error> {
261 let mut current_old_slot = old_head_header.slot;
262 let mut reorg_depth = 0;
263
264 let mut rewinded_blocks: Vec<B256> = vec![];
265
266 while reorg_depth <= MAX_ALLOWED_REORG_DEPTH && current_old_slot > 0 {
267 if let Some(old_blobscan_block) = self
270 .context
271 .blobscan_client()
272 .get_block(current_old_slot)
273 .await?
274 {
275 let canonical_block_path = self
276 .get_canonical_block_path(&old_blobscan_block, new_head_header.root)
277 .await?;
278
279 if !canonical_block_path.is_empty() {
281 let canonical_block_path =
282 canonical_block_path.into_iter().rev().collect::<Vec<_>>();
283
284 let forwarded_blocks = canonical_block_path
285 .iter()
286 .map(|block| block.execution_block_hash)
287 .collect::<Vec<_>>();
288
289 self.context
290 .blobscan_client()
291 .handle_reorg(rewinded_blocks.clone(), forwarded_blocks.clone())
292 .await?;
293
294 info!(rewinded_blocks = ?rewinded_blocks, forwarded_blocks = ?forwarded_blocks, "Reorg handled!");
295
296 let canonical_block_headers: Vec<BlockHeader> = canonical_block_path
297 .iter()
298 .map(|block| block.into())
299 .collect::<Vec<_>>();
300
301 for block in canonical_block_headers.iter() {
304 if block.slot != new_head_header.slot {
305 let reorg_span = tracing::info_span!(
306 parent: &tracing::Span::current(),
307 "forwarded_block",
308 );
309
310 self.process_block(block)
311 .instrument(reorg_span)
312 .await
313 .with_context(|| "Failed to sync forwarded block".to_string())?;
314 }
315 }
316
317 return Ok(());
318 }
319
320 rewinded_blocks.push(old_blobscan_block.hash);
321 }
322
323 current_old_slot -= 1;
324 reorg_depth += 1;
325 }
326
327 let rewinded_blocks_count = rewinded_blocks.len();
328
329 if rewinded_blocks_count > 0 {
330 return Err(anyhow!("{rewinded_blocks_count} Blobscan blocks to rewind detected but no common ancestor found"));
331 }
332
333 info!("Skipping reorg handling: no Blobscan blocks to rewind found");
334
335 Ok(())
336 }
337
338 async fn get_canonical_block_path(
340 &mut self,
341 blobscan_block: &BlobscanBlock,
342 head_block_root: B256,
343 ) -> Result<Vec<BlockData>, ClientError> {
344 let beacon_client = self.context.beacon_client();
345 let mut canonical_execution_blocks: Vec<BlockData> = vec![];
346
347 let mut canonical_block = match beacon_client.get_block(head_block_root.into()).await? {
348 Some(block) => block,
349 None => {
350 return Ok(vec![]);
351 }
352 };
353
354 if let Some(execution_payload) = &canonical_block.execution_payload {
355 if execution_payload.block_hash == blobscan_block.hash {
356 return Ok(vec![]);
357 }
358 }
359
360 let mut current_canonical_block_root = head_block_root;
361
362 while canonical_block.parent_root != B256::ZERO {
363 let canonical_block_parent_root = canonical_block.parent_root;
364
365 if canonical_block.slot < blobscan_block.slot {
366 return Ok(vec![]);
367 }
368
369 if let Some(execution_payload) = &canonical_block.execution_payload {
370 if execution_payload.block_hash == blobscan_block.hash {
371 return Ok(canonical_execution_blocks);
372 }
373
374 canonical_execution_blocks.push(BlockData {
375 root: current_canonical_block_root,
376 parent_root: canonical_block_parent_root,
377 slot: canonical_block.slot,
378 execution_block_hash: execution_payload.block_hash,
379 });
380 }
381
382 canonical_block = match beacon_client
383 .get_block(canonical_block_parent_root.into())
384 .await?
385 {
386 Some(block) => block,
387 None => {
388 return Ok(vec![]);
389 }
390 };
391
392 current_canonical_block_root = canonical_block_parent_root;
393 }
394
395 Ok(vec![])
396 }
397}