blob_indexer/indexer/tasks/
sse_indexing.rs

1use alloy::primitives::B256;
2use anyhow::anyhow;
3use futures::{FutureExt, StreamExt};
4use reqwest_eventsource::Event;
5use tokio::{sync::oneshot, task::JoinHandle};
6use tracing::{debug, info, info_span, Instrument};
7
8use crate::{
9    clients::{
10        beacon::types::{BlockHeader, FinalizedCheckpointEventData, HeadEventData, Topic},
11        blobscan::types::BlockchainSyncState,
12        common::ClientError,
13    },
14    context::CommonContext,
15    indexer::{
16        tasks::indexing::{IndexingTask, RunParams as IndexingRunParams},
17        types::{
18            ErrorResport, IndexingTaskJoinHandle, TaskErrorChannelSender, TaskResult,
19            TaskResultChannelReceiver,
20        },
21    },
22    synchronizer::{CheckpointType, CommonSynchronizer, SynchronizerBuilder},
23    utils::alloy::B256Ext,
24};
25
26#[derive(Debug, thiserror::Error)]
27pub enum SSEIndexingError {
28    #[error("an error ocurred while receiving events from the SSE stream")]
29    ConnectionFailure(#[from] reqwest_eventsource::Error),
30    #[error("failed to subscribe to SSE stream")]
31    FailedSubscription(#[source] ClientError),
32    #[error("failed to fetch indexer state")]
33    IndexerStateRetrievalError(#[source] ClientError),
34    #[error("unexpected event \"{0}\" received")]
35    UnknownEvent(String),
36    #[error(transparent)]
37    EventDeserializationFailure(#[from] serde_json::Error),
38    #[error("failed to handle event \"{event}\": {error}")]
39    EventHandlingError { event: String, error: anyhow::Error },
40}
41
42pub struct RunParams {
43    pub last_synced_slot: Option<u32>,
44    pub last_synced_block: Option<BlockHeader>,
45}
46
47pub struct SSEIndexingTask {
48    context: Box<dyn CommonContext>,
49    error_report_tx: TaskErrorChannelSender,
50}
51
52impl SSEIndexingTask {
53    pub fn new(context: Box<dyn CommonContext>, error_report_tx: TaskErrorChannelSender) -> Self {
54        SSEIndexingTask {
55            context,
56            error_report_tx,
57        }
58    }
59
60    pub fn run(&self, params: RunParams) -> IndexingTaskJoinHandle {
61        let context = self.context.clone();
62        let error_report_tx = self.error_report_tx.clone();
63        let last_synced_block = params.last_synced_block;
64        let last_synced_slot = params.last_synced_slot;
65
66        tokio::spawn(async move {
67            let topics = vec![Topic::Head, Topic::FinalizedCheckpoint];
68            let events = topics
69                .iter()
70                .map(|topic| topic.into())
71                .collect::<Vec<String>>()
72                .join(", ");
73            let sse_indexing_span = info_span!("sse-indexing");
74            let mut last_sse_synced_block = last_synced_block;
75            let mut last_sse_synced_slot = last_synced_slot;
76
77            loop {
78                let result: Result<(), SSEIndexingError> = async {
79                    let mut sse_synchronizer_builder = SynchronizerBuilder::default();
80
81                    if let Some(last_synced_block) = last_sse_synced_block.clone() {
82                        sse_synchronizer_builder.with_last_synced_block(last_synced_block);
83                    }
84
85                    let mut sse_synchronizer = sse_synchronizer_builder.build(context.clone());
86
87                    let mut event_source = context
88                        .beacon_client()
89                        .subscribe_to_events(&topics)
90                        .map_err(SSEIndexingError::FailedSubscription)?;
91
92                    info!("Subscribed to stream events: {}", events);
93
94                    let mut catchup_sync_rx: Option<TaskResultChannelReceiver> = None;
95                    let mut catchup_task_handle: Option<JoinHandle<()>> = None;
96                    let mut is_first_event = true;
97                    let head_event_span = info_span!("head");
98                    let finalized_event_span =
99                        info_span!("finalized_checkpoint");
100
101                    while let Some(event) = event_source.next().await {
102                        match event {
103                            Ok(Event::Open) => {
104                                debug!("Subscrption connection opened")
105                            }
106                            Ok(Event::Message(event)) => {
107                                let event_name = event.event.as_str();
108
109                                match event_name {
110                                    "head" => {
111                                        let head_block_data =
112                                            serde_json::from_str::<HeadEventData>(&event.data)?;
113                                        let head_slot = head_block_data.slot;
114
115                                            if let Some(Ok(_)) = catchup_sync_rx
116                                                .as_mut()
117                                                .and_then(|rx| rx.now_or_never())
118                                            {
119                                                sse_synchronizer
120                                                    .set_checkpoint(Some(CheckpointType::Upper));
121                                                catchup_sync_rx = None;
122                                            }
123
124
125                                        if is_first_event {
126                                            if let Some(last_sse_synced_slot) = last_sse_synced_slot {
127                                                if last_sse_synced_slot < head_slot - 1 {
128                                                    let (channel_tx, channel_rx) =
129                                                        oneshot::channel::<TaskResult>();
130
131                                                    let catchup_task = IndexingTask::new(
132                                                        "catchup",
133                                                        context.clone(),
134                                                        Some(info_span!(parent: None, "catchup"))
135                                                    );
136
137
138                                                    catchup_task_handle = Some(catchup_task.run(IndexingRunParams {
139                                                        error_report_tx: error_report_tx.clone(),
140                                                        result_report_tx: Some(channel_tx),
141                                                        from_block_id: (last_sse_synced_slot + 1)
142                                                            .into(),
143                                                        to_block_id: head_slot.into(),
144                                                        prev_block: last_sse_synced_block.clone(),
145                                                        checkpoint: Some(CheckpointType::Upper),
146                                                    }));
147
148
149                                                    catchup_sync_rx = Some(channel_rx);
150
151                                                    sse_synchronizer.set_checkpoint(None);
152                                                    sse_synchronizer.set_last_synced_block(None);
153                                                }
154                                            }
155                                        }
156
157                                        sse_synchronizer
158                                            .sync_block(head_slot.into())
159                                            .instrument(head_event_span.clone())
160                                            .await
161                                            .map_err(|err| {
162                                                SSEIndexingError::EventHandlingError {
163                                                    event: event.event.clone(),
164                                                    error: err.into(),
165                                                }
166                                            })?;
167
168                                        is_first_event = false;
169                                    }
170                                    "finalized_checkpoint" => {
171                                        async {
172                                            let finalized_checkpoint_data = serde_json::from_str::<
173                                                FinalizedCheckpointEventData,
174                                            >(
175                                                &event.data
176                                            )?;
177
178                                             let block_hash = finalized_checkpoint_data.block;
179                                        let full_block_hash = block_hash.to_full_hex();
180                                        let last_finalized_block_number = match
181                                            context
182                                            .beacon_client()
183                                            .get_block(block_hash.into())
184                                            .await
185                                            .map_err(|err|
186                                                SSEIndexingError::EventHandlingError { event: event.event.clone(), error: anyhow!(
187                                                    "Failed to retrieve finalized block {full_block_hash}: {err}"
188                                                ) }
189                                            )? {
190                                            Some(block) => match block.execution_payload {
191                                                Some(execution_payload) => execution_payload.block_number,
192                                                None => {
193                                                    return Err(
194                                                        SSEIndexingError::EventHandlingError { event: event.event.clone(), error: anyhow!(
195                                                    "Finalized block {full_block_hash} not found"
196                                                ) },
197                                                    )
198                                                }
199                                            },
200                                            None => {
201                                                return Err(
202                                                    SSEIndexingError::EventHandlingError { event: event.event.clone(), error: anyhow!(
203                                                    "Finalized block {full_block_hash} not found"
204                                                ) },
205                                                )
206                                            }
207                                        };
208
209                                        context
210                                            .blobscan_client()
211                                            .update_sync_state(BlockchainSyncState {
212                                                last_finalized_block: Some(last_finalized_block_number),
213                                                last_lower_synced_slot: None,
214                                                last_upper_synced_slot: None,
215                                                last_upper_synced_block_root: None,
216                                                last_upper_synced_block_slot: None,
217                                            })
218                                            .await
219                                            .map_err(|err| SSEIndexingError::EventHandlingError {
220                                                event: event.event,
221                                                error: err.into(),
222                                            })?;
223
224                                        info!(
225                                            finalized_execution_block = last_finalized_block_number,
226                                            "Updated last finalized block number"
227                                        );
228
229                                            Ok::<_, SSEIndexingError>(())
230                                        }
231                                        .instrument(finalized_event_span.clone())
232                                        .await?;
233                                    }
234                                    unexpected_event => {
235                                        return Err(SSEIndexingError::UnknownEvent(
236                                            unexpected_event.into(),
237                                        ));
238                                    }
239                                }
240                            }
241                            Err(error) => {
242                                event_source.close();
243
244                                if let Some(catchup_task_handle) = catchup_task_handle {
245                                    catchup_task_handle.abort();
246                                }
247
248                                if let reqwest_eventsource::Error::StreamEnded = error {
249                                    info!("SSE stream ended. Resubscribing to stream…");
250
251                                    let sync_state = context.blobscan_client().get_sync_state().await.map_err(SSEIndexingError::IndexerStateRetrievalError)?;
252
253                                    last_sse_synced_slot = sync_state.as_ref().and_then(|state| state.last_upper_synced_slot);
254                                    last_sse_synced_block = sync_state.as_ref().and_then(|state| {
255                                        match (
256                                            state.last_upper_synced_block_root,
257                                            state.last_upper_synced_block_slot,
258                                        ) {
259                                            (Some(root), Some(slot)) => Some(BlockHeader {
260                                                parent_root: B256::ZERO,
261                                                root,
262                                                slot,
263                                            }),
264                                            _ => None,
265                                        }
266                                    });
267
268                                    break;
269                                } else {
270                                    return Err(error.into());
271                                }
272                            }
273                        }
274                    }
275
276                    Ok(())
277                }.instrument(sse_indexing_span.clone())
278                .await;
279
280                if let Err(error) = result {
281                    error_report_tx
282                        .send(ErrorResport {
283                            task_name: "sse-indexing".into(),
284                            error: error.into(),
285                        })
286                        .await
287                        .unwrap();
288
289                    break;
290                }
291            }
292        })
293    }
294}