blob_indexer/indexer/tasks/
sse_indexing.rs1use alloy::primitives::B256;
2use anyhow::anyhow;
3use futures::{FutureExt, StreamExt};
4use reqwest_eventsource::Event;
5use tokio::{sync::oneshot, task::JoinHandle};
6use tracing::{debug, info, info_span, Instrument};
7
8use crate::{
9 clients::{
10 beacon::types::{BlockHeader, FinalizedCheckpointEventData, HeadEventData, Topic},
11 blobscan::types::BlockchainSyncState,
12 common::ClientError,
13 },
14 context::CommonContext,
15 indexer::{
16 tasks::indexing::{IndexingTask, RunParams as IndexingRunParams},
17 types::{
18 ErrorResport, IndexingTaskJoinHandle, TaskErrorChannelSender, TaskResult,
19 TaskResultChannelReceiver,
20 },
21 },
22 synchronizer::{CheckpointType, CommonSynchronizer, SynchronizerBuilder},
23 utils::alloy::B256Ext,
24};
25
26#[derive(Debug, thiserror::Error)]
27pub enum SSEIndexingError {
28 #[error("an error ocurred while receiving events from the SSE stream")]
29 ConnectionFailure(#[from] reqwest_eventsource::Error),
30 #[error("failed to subscribe to SSE stream")]
31 FailedSubscription(#[source] ClientError),
32 #[error("failed to fetch indexer state")]
33 IndexerStateRetrievalError(#[source] ClientError),
34 #[error("unexpected event \"{0}\" received")]
35 UnknownEvent(String),
36 #[error(transparent)]
37 EventDeserializationFailure(#[from] serde_json::Error),
38 #[error("failed to handle event \"{event}\": {error}")]
39 EventHandlingError { event: String, error: anyhow::Error },
40}
41
42pub struct RunParams {
43 pub last_synced_slot: Option<u32>,
44 pub last_synced_block: Option<BlockHeader>,
45}
46
47pub struct SSEIndexingTask {
48 context: Box<dyn CommonContext>,
49 error_report_tx: TaskErrorChannelSender,
50}
51
52impl SSEIndexingTask {
53 pub fn new(context: Box<dyn CommonContext>, error_report_tx: TaskErrorChannelSender) -> Self {
54 SSEIndexingTask {
55 context,
56 error_report_tx,
57 }
58 }
59
60 pub fn run(&self, params: RunParams) -> IndexingTaskJoinHandle {
61 let context = self.context.clone();
62 let error_report_tx = self.error_report_tx.clone();
63 let last_synced_block = params.last_synced_block;
64 let last_synced_slot = params.last_synced_slot;
65
66 tokio::spawn(async move {
67 let topics = vec![Topic::Head, Topic::FinalizedCheckpoint];
68 let events = topics
69 .iter()
70 .map(|topic| topic.into())
71 .collect::<Vec<String>>()
72 .join(", ");
73 let sse_indexing_span = info_span!("sse-indexing");
74 let mut last_sse_synced_block = last_synced_block;
75 let mut last_sse_synced_slot = last_synced_slot;
76
77 loop {
78 let result: Result<(), SSEIndexingError> = async {
79 let mut sse_synchronizer_builder = SynchronizerBuilder::default();
80
81 if let Some(last_synced_block) = last_sse_synced_block.clone() {
82 sse_synchronizer_builder.with_last_synced_block(last_synced_block);
83 }
84
85 let mut sse_synchronizer = sse_synchronizer_builder.build(context.clone());
86
87 let mut event_source = context
88 .beacon_client()
89 .subscribe_to_events(&topics)
90 .map_err(SSEIndexingError::FailedSubscription)?;
91
92 info!("Subscribed to stream events: {}", events);
93
94 let mut catchup_sync_rx: Option<TaskResultChannelReceiver> = None;
95 let mut catchup_task_handle: Option<JoinHandle<()>> = None;
96 let mut is_first_event = true;
97 let head_event_span = info_span!("head");
98 let finalized_event_span =
99 info_span!("finalized_checkpoint");
100
101 while let Some(event) = event_source.next().await {
102 match event {
103 Ok(Event::Open) => {
104 debug!("Subscrption connection opened")
105 }
106 Ok(Event::Message(event)) => {
107 let event_name = event.event.as_str();
108
109 match event_name {
110 "head" => {
111 let head_block_data =
112 serde_json::from_str::<HeadEventData>(&event.data)?;
113 let head_slot = head_block_data.slot;
114
115 if let Some(Ok(_)) = catchup_sync_rx
116 .as_mut()
117 .and_then(|rx| rx.now_or_never())
118 {
119 sse_synchronizer
120 .set_checkpoint(Some(CheckpointType::Upper));
121 catchup_sync_rx = None;
122 }
123
124
125 if is_first_event {
126 if let Some(last_sse_synced_slot) = last_sse_synced_slot {
127 if last_sse_synced_slot < head_slot - 1 {
128 let (channel_tx, channel_rx) =
129 oneshot::channel::<TaskResult>();
130
131 let catchup_task = IndexingTask::new(
132 "catchup",
133 context.clone(),
134 Some(info_span!(parent: None, "catchup"))
135 );
136
137
138 catchup_task_handle = Some(catchup_task.run(IndexingRunParams {
139 error_report_tx: error_report_tx.clone(),
140 result_report_tx: Some(channel_tx),
141 from_block_id: (last_sse_synced_slot + 1)
142 .into(),
143 to_block_id: head_slot.into(),
144 prev_block: last_sse_synced_block.clone(),
145 checkpoint: Some(CheckpointType::Upper),
146 }));
147
148
149 catchup_sync_rx = Some(channel_rx);
150
151 sse_synchronizer.set_checkpoint(None);
152 sse_synchronizer.set_last_synced_block(None);
153 }
154 }
155 }
156
157 sse_synchronizer
158 .sync_block(head_slot.into())
159 .instrument(head_event_span.clone())
160 .await
161 .map_err(|err| {
162 SSEIndexingError::EventHandlingError {
163 event: event.event.clone(),
164 error: err.into(),
165 }
166 })?;
167
168 is_first_event = false;
169 }
170 "finalized_checkpoint" => {
171 async {
172 let finalized_checkpoint_data = serde_json::from_str::<
173 FinalizedCheckpointEventData,
174 >(
175 &event.data
176 )?;
177
178 let block_hash = finalized_checkpoint_data.block;
179 let full_block_hash = block_hash.to_full_hex();
180 let last_finalized_block_number = match
181 context
182 .beacon_client()
183 .get_block(block_hash.into())
184 .await
185 .map_err(|err|
186 SSEIndexingError::EventHandlingError { event: event.event.clone(), error: anyhow!(
187 "Failed to retrieve finalized block {full_block_hash}: {err}"
188 ) }
189 )? {
190 Some(block) => match block.execution_payload {
191 Some(execution_payload) => execution_payload.block_number,
192 None => {
193 return Err(
194 SSEIndexingError::EventHandlingError { event: event.event.clone(), error: anyhow!(
195 "Finalized block {full_block_hash} not found"
196 ) },
197 )
198 }
199 },
200 None => {
201 return Err(
202 SSEIndexingError::EventHandlingError { event: event.event.clone(), error: anyhow!(
203 "Finalized block {full_block_hash} not found"
204 ) },
205 )
206 }
207 };
208
209 context
210 .blobscan_client()
211 .update_sync_state(BlockchainSyncState {
212 last_finalized_block: Some(last_finalized_block_number),
213 last_lower_synced_slot: None,
214 last_upper_synced_slot: None,
215 last_upper_synced_block_root: None,
216 last_upper_synced_block_slot: None,
217 })
218 .await
219 .map_err(|err| SSEIndexingError::EventHandlingError {
220 event: event.event,
221 error: err.into(),
222 })?;
223
224 info!(
225 finalized_execution_block = last_finalized_block_number,
226 "Updated last finalized block number"
227 );
228
229 Ok::<_, SSEIndexingError>(())
230 }
231 .instrument(finalized_event_span.clone())
232 .await?;
233 }
234 unexpected_event => {
235 return Err(SSEIndexingError::UnknownEvent(
236 unexpected_event.into(),
237 ));
238 }
239 }
240 }
241 Err(error) => {
242 event_source.close();
243
244 if let Some(catchup_task_handle) = catchup_task_handle {
245 catchup_task_handle.abort();
246 }
247
248 if let reqwest_eventsource::Error::StreamEnded = error {
249 info!("SSE stream ended. Resubscribing to stream…");
250
251 let sync_state = context.blobscan_client().get_sync_state().await.map_err(SSEIndexingError::IndexerStateRetrievalError)?;
252
253 last_sse_synced_slot = sync_state.as_ref().and_then(|state| state.last_upper_synced_slot);
254 last_sse_synced_block = sync_state.as_ref().and_then(|state| {
255 match (
256 state.last_upper_synced_block_root,
257 state.last_upper_synced_block_slot,
258 ) {
259 (Some(root), Some(slot)) => Some(BlockHeader {
260 parent_root: B256::ZERO,
261 root,
262 slot,
263 }),
264 _ => None,
265 }
266 });
267
268 break;
269 } else {
270 return Err(error.into());
271 }
272 }
273 }
274 }
275
276 Ok(())
277 }.instrument(sse_indexing_span.clone())
278 .await;
279
280 if let Err(error) = result {
281 error_report_tx
282 .send(ErrorResport {
283 task_name: "sse-indexing".into(),
284 error: error.into(),
285 })
286 .await
287 .unwrap();
288
289 break;
290 }
291 }
292 })
293 }
294}