blob_indexer/slots_processor/
mod.rs

1use alloy::{
2    consensus::Transaction,
3    eips::{eip4844::kzg_to_versioned_hash, BlockId},
4    primitives::B256,
5};
6use anyhow::{anyhow, Context as AnyhowContext, Result};
7
8use crate::{clients::beacon::types::BlockHeader, utils::alloy::BlobTransactionExt};
9use tracing::{debug, info, Instrument};
10
11use crate::{
12    clients::{
13        blobscan::types::{Blob, BlobscanBlock, Block, Transaction as BlobscanTransaction},
14        common::ClientError,
15    },
16    context::CommonContext,
17};
18
19use self::error::{SlotProcessingError, SlotsProcessorError};
20
21pub mod error;
22
23const MAX_ALLOWED_REORG_DEPTH: u32 = 100;
24
25pub struct BlockData {
26    pub root: B256,
27    pub parent_root: B256,
28    pub slot: u32,
29    pub execution_block_hash: B256,
30}
31
32impl From<&BlockData> for BlockHeader {
33    fn from(block: &BlockData) -> Self {
34        BlockHeader {
35            root: block.root,
36            parent_root: block.parent_root,
37            slot: block.slot,
38        }
39    }
40}
41
42pub struct SlotsProcessor {
43    context: Box<dyn CommonContext>,
44    pub last_processed_block: Option<BlockHeader>,
45}
46
47impl SlotsProcessor {
48    pub fn new(
49        context: Box<dyn CommonContext>,
50        last_processed_block: Option<BlockHeader>,
51    ) -> SlotsProcessor {
52        Self {
53            context,
54            last_processed_block,
55        }
56    }
57
58    pub async fn process_slots(
59        &mut self,
60        initial_slot: u32,
61        final_slot: u32,
62    ) -> Result<(), SlotsProcessorError> {
63        let is_reverse = initial_slot > final_slot;
64        let slots = if is_reverse {
65            (final_slot..initial_slot).rev().collect::<Vec<_>>()
66        } else {
67            (initial_slot..final_slot).collect::<Vec<_>>()
68        };
69
70        let mut last_processed_block = self.last_processed_block.clone();
71
72        for current_slot in slots {
73            let block_header = match self
74                .context
75                .beacon_client()
76                .get_block_header(current_slot.into())
77                .await?
78            {
79                Some(header) => header,
80                None => {
81                    debug!(current_slot, "Skipping as there is no beacon block header");
82
83                    continue;
84                }
85            };
86
87            if !is_reverse {
88                if let Some(prev_block_header) = last_processed_block {
89                    if prev_block_header.root != B256::ZERO
90                        && prev_block_header.root != block_header.parent_root
91                    {
92                        info!(
93                            new_head_slot = block_header.slot,
94                            old_head_slot = prev_block_header.slot,
95                            new_head_block_root = ?block_header.root,
96                            old_head_block_root = ?prev_block_header.root,
97                            "Reorg detected!",
98                        );
99
100                        self.process_reorg(&prev_block_header, &block_header)
101                            .await
102                            .map_err(|error| SlotsProcessorError::FailedReorgProcessing {
103                                old_slot: prev_block_header.slot,
104                                new_slot: block_header.slot,
105                                new_head_block_root: block_header.root,
106                                old_head_block_root: prev_block_header.root,
107                                error,
108                            })?;
109                    }
110                }
111            }
112
113            if let Err(error) = self.process_block(&block_header).await {
114                return Err(SlotsProcessorError::FailedSlotsProcessing {
115                    initial_slot,
116                    final_slot,
117                    failed_slot: current_slot,
118                    error,
119                });
120            }
121
122            last_processed_block = Some(block_header);
123        }
124
125        self.last_processed_block = last_processed_block;
126
127        Ok(())
128    }
129
130    async fn process_block(
131        &self,
132        beacon_block_header: &BlockHeader,
133    ) -> Result<(), SlotProcessingError> {
134        let beacon_client = self.context.beacon_client();
135        let blobscan_client = self.context.blobscan_client();
136        let provider = self.context.provider();
137        let beacon_block_root = beacon_block_header.root;
138        let slot = beacon_block_header.slot;
139
140        let beacon_block = match beacon_client.get_block(slot.into()).await? {
141            Some(block) => block,
142            None => {
143                debug!(slot = slot, "Skipping as there is no beacon block");
144
145                return Ok(());
146            }
147        };
148
149        let execution_payload = match beacon_block.execution_payload {
150            Some(payload) => payload,
151            None => {
152                debug!(
153                    slot,
154                    "Skipping as beacon block doesn't contain execution payload"
155                );
156
157                return Ok(());
158            }
159        };
160
161        let has_kzg_blob_commitments = match beacon_block.blob_kzg_commitments {
162            Some(commitments) => !commitments.is_empty(),
163            None => false,
164        };
165
166        if !has_kzg_blob_commitments {
167            debug!(
168                slot,
169                "Skipping as beacon block doesn't contain blob kzg commitments"
170            );
171
172            return Ok(());
173        }
174
175        let execution_block_hash = execution_payload.block_hash;
176
177        // Fetch execution block and perform some checks
178
179        let execution_block = provider
180            .get_block(BlockId::Hash(execution_block_hash.into()))
181            .full()
182            .await?
183            .with_context(|| format!("Execution block {execution_block_hash} not found"))?;
184
185        let blob_txs = execution_block.transactions.filter_blob_transactions();
186
187        if blob_txs.is_empty() {
188            return Err(anyhow!("Blocks mismatch: Consensus block \"{beacon_block_root}\" contains blob KZG commitments, but the corresponding execution block \"{execution_block_hash:#?}\" does not contain any blob transactions").into());
189        }
190
191        let blobs = match beacon_client
192            .get_blobs(slot.into())
193            .await
194            .map_err(SlotProcessingError::ClientError)?
195        {
196            Some(blobs) => {
197                if blobs.is_empty() {
198                    debug!(slot, "Skipping as blobs sidecar is empty");
199
200                    return Ok(());
201                } else {
202                    blobs
203                }
204            }
205            None => {
206                debug!(slot, "Skipping as there is no blobs sidecar");
207
208                return Ok(());
209            }
210        };
211
212        // Create entities to be indexed
213        let block_entity = Block::try_from((&execution_block, slot))?;
214        let tx_entities = blob_txs
215            .iter()
216            .map(|tx| BlobscanTransaction::try_from((*tx, &execution_block)))
217            .collect::<Result<Vec<BlobscanTransaction>>>()?;
218
219        let blob_entities = blob_txs
220            .into_iter()
221            .flat_map(|tx| {
222               tx.blob_versioned_hashes()
223                    .into_iter()
224                    .flatten()
225                    .enumerate()
226                    .map( |(i, versioned_hash)| {
227                        let tx_hash = tx.inner.hash();
228                        let blob = blobs
229                            .iter()
230                            .find(|blob| {
231                                let vh = kzg_to_versioned_hash(blob.kzg_commitment.as_ref());
232
233                                vh.eq(versioned_hash)
234                            })
235                            .with_context(|| format!(
236                                "Sidecar not found for blob {i:?} with versioned hash {versioned_hash:?} from tx {tx_hash:?}"
237                            ))?;
238
239                        Ok(Blob::from((blob, (i as u32), tx_hash)))
240                    })
241            })
242            .collect::<Result<Vec<Blob>, anyhow::Error>>()?;
243
244        blobscan_client
245            .index(block_entity, tx_entities, blob_entities)
246            .await
247            .map_err(SlotProcessingError::ClientError)?;
248
249        let block_number = execution_block.header.number;
250        info!(slot, block_number, "Block indexed successfully");
251
252        Ok(())
253    }
254
255    /// Handles reorgs by rewinding the blobscan blocks to the common ancestor and forwarding to the new head.
256    async fn process_reorg(
257        &mut self,
258        old_head_header: &BlockHeader,
259        new_head_header: &BlockHeader,
260    ) -> Result<(), anyhow::Error> {
261        let mut current_old_slot = old_head_header.slot;
262        let mut reorg_depth = 0;
263
264        let mut rewinded_blocks: Vec<B256> = vec![];
265
266        while reorg_depth <= MAX_ALLOWED_REORG_DEPTH && current_old_slot > 0 {
267            // We iterate over blocks by slot and not block root as blobscan blocks don't
268            // have parent root we can use to traverse the chain
269            if let Some(old_blobscan_block) = self
270                .context
271                .blobscan_client()
272                .get_block(current_old_slot)
273                .await?
274            {
275                let canonical_block_path = self
276                    .get_canonical_block_path(&old_blobscan_block, new_head_header.root)
277                    .await?;
278
279                // If a path exists, we've found the common ancient block
280                if !canonical_block_path.is_empty() {
281                    let canonical_block_path =
282                        canonical_block_path.into_iter().rev().collect::<Vec<_>>();
283
284                    let forwarded_blocks = canonical_block_path
285                        .iter()
286                        .map(|block| block.execution_block_hash)
287                        .collect::<Vec<_>>();
288
289                    self.context
290                        .blobscan_client()
291                        .handle_reorg(rewinded_blocks.clone(), forwarded_blocks.clone())
292                        .await?;
293
294                    info!(rewinded_blocks = ?rewinded_blocks, forwarded_blocks = ?forwarded_blocks, "Reorg handled!");
295
296                    let canonical_block_headers: Vec<BlockHeader> = canonical_block_path
297                        .iter()
298                        .map(|block| block.into())
299                        .collect::<Vec<_>>();
300
301                    // If the new canonical block path includes blocks beyond the new head block,
302                    // they were skipped and must be processed.
303                    for block in canonical_block_headers.iter() {
304                        if block.slot != new_head_header.slot {
305                            let reorg_span = tracing::info_span!(
306                                parent: &tracing::Span::current(),
307                                "forwarded_block",
308                            );
309
310                            self.process_block(block)
311                                .instrument(reorg_span)
312                                .await
313                                .with_context(|| "Failed to sync forwarded block".to_string())?;
314                        }
315                    }
316
317                    return Ok(());
318                }
319
320                rewinded_blocks.push(old_blobscan_block.hash);
321            }
322
323            current_old_slot -= 1;
324            reorg_depth += 1;
325        }
326
327        let rewinded_blocks_count = rewinded_blocks.len();
328
329        if rewinded_blocks_count > 0 {
330            return Err(anyhow!("{rewinded_blocks_count} Blobscan blocks to rewind detected but no common ancestor found"));
331        }
332
333        info!("Skipping reorg handling: no Blobscan blocks to rewind found");
334
335        Ok(())
336    }
337
338    /// Returns the path of blocks with execution payload from the head block to the provided block.
339    async fn get_canonical_block_path(
340        &mut self,
341        blobscan_block: &BlobscanBlock,
342        head_block_root: B256,
343    ) -> Result<Vec<BlockData>, ClientError> {
344        let beacon_client = self.context.beacon_client();
345        let mut canonical_execution_blocks: Vec<BlockData> = vec![];
346
347        let mut canonical_block = match beacon_client.get_block(head_block_root.into()).await? {
348            Some(block) => block,
349            None => {
350                return Ok(vec![]);
351            }
352        };
353
354        if let Some(execution_payload) = &canonical_block.execution_payload {
355            if execution_payload.block_hash == blobscan_block.hash {
356                return Ok(vec![]);
357            }
358        }
359
360        let mut current_canonical_block_root = head_block_root;
361
362        while canonical_block.parent_root != B256::ZERO {
363            let canonical_block_parent_root = canonical_block.parent_root;
364
365            if canonical_block.slot < blobscan_block.slot {
366                return Ok(vec![]);
367            }
368
369            if let Some(execution_payload) = &canonical_block.execution_payload {
370                if execution_payload.block_hash == blobscan_block.hash {
371                    return Ok(canonical_execution_blocks);
372                }
373
374                canonical_execution_blocks.push(BlockData {
375                    root: current_canonical_block_root,
376                    parent_root: canonical_block_parent_root,
377                    slot: canonical_block.slot,
378                    execution_block_hash: execution_payload.block_hash,
379                });
380            }
381
382            canonical_block = match beacon_client
383                .get_block(canonical_block_parent_root.into())
384                .await?
385            {
386                Some(block) => block,
387                None => {
388                    return Ok(vec![]);
389                }
390            };
391
392            current_canonical_block_root = canonical_block_parent_root;
393        }
394
395        Ok(vec![])
396    }
397}