blob_indexer/indexer/tasks/
sse_indexing.rs1use anyhow::anyhow;
2use futures::{FutureExt, StreamExt};
3use reqwest_eventsource::Event;
4use tokio::sync::oneshot;
5use tracing::{debug, info, info_span, Instrument};
6
7use crate::{
8 clients::{
9 beacon::types::{BlockHeader, FinalizedCheckpointEventData, HeadEventData, Topic},
10 blobscan::types::BlockchainSyncState,
11 common::ClientError,
12 },
13 context::CommonContext,
14 indexer::{
15 tasks::indexing::{IndexingTask, RunParams as IndexingRunParams},
16 types::{
17 ErrorResport, IndexingTaskJoinHandle, TaskErrorChannelSender, TaskResult,
18 TaskResultChannelReceiver,
19 },
20 },
21 synchronizer::{CheckpointType, CommonSynchronizer, SynchronizerBuilder},
22 utils::alloy::B256Ext,
23};
24
25#[derive(Debug, thiserror::Error)]
26pub enum SSEIndexingError {
27 #[error("an error ocurred while receiving events from the SSE stream")]
28 ConnectionFailure(#[from] reqwest_eventsource::Error),
29 #[error("failed to subscribe to SSE stream")]
30 FailedSubscription(#[source] ClientError),
31 #[error("unexpected event \"{0}\" received")]
32 UnknownEvent(String),
33 #[error(transparent)]
34 EventDeserializationFailure(#[from] serde_json::Error),
35 #[error("failed to handle event \"{event}\": {error}")]
36 EventHandlingError { event: String, error: anyhow::Error },
37}
38
39pub struct RunParams {
40 pub last_synced_slot: Option<u32>,
41 pub last_synced_block: Option<BlockHeader>,
42}
43
44pub struct SSEIndexingTask {
45 context: Box<dyn CommonContext>,
46 error_report_tx: TaskErrorChannelSender,
47}
48
49impl SSEIndexingTask {
50 pub fn new(context: Box<dyn CommonContext>, error_report_tx: TaskErrorChannelSender) -> Self {
51 SSEIndexingTask {
52 context,
53 error_report_tx,
54 }
55 }
56
57 pub fn run(&self, params: RunParams) -> IndexingTaskJoinHandle {
58 let context = self.context.clone();
59 let error_report_tx = self.error_report_tx.clone();
60 let last_synced_block = params.last_synced_block;
61 let last_synced_slot = params.last_synced_slot;
62
63 tokio::spawn(async move {
64 let mut sse_synchronizer_builder = SynchronizerBuilder::default();
65
66 if let Some(prev_block) = last_synced_block.clone() {
67 sse_synchronizer_builder.with_last_synced_block(prev_block);
68 }
69
70 let mut sse_synchronizer = sse_synchronizer_builder.build(context.clone());
71
72 let topics = vec![Topic::Head, Topic::FinalizedCheckpoint];
73 let events = topics
74 .iter()
75 .map(|topic| topic.into())
76 .collect::<Vec<String>>()
77 .join(", ");
78 let sse_indexing_span = info_span!("sse-indexing");
79
80 loop {
81 let result: Result<(), SSEIndexingError> = async {
82 let mut event_source = context
83 .beacon_client()
84 .subscribe_to_events(&topics)
85 .map_err(SSEIndexingError::FailedSubscription)?;
86
87 info!("Subscribed to stream events: {}", events);
88
89 let mut catchup_sync_rx: Option<TaskResultChannelReceiver> = None;
90 let mut is_first_event = true;
91 let mut catchup_in_progress = false;
92 let head_event_span = info_span!("head");
93 let finalized_event_span =
94 info_span!("finalized_checkpoint");
95
96 while let Some(event) = event_source.next().await {
97 match event {
98 Ok(Event::Open) => {
99 debug!("Subscrption connection opened")
100 }
101 Ok(Event::Message(event)) => {
102 let event_name = event.event.as_str();
103
104 match event_name {
105 "head" => {
106 let head_block_data =
107 serde_json::from_str::<HeadEventData>(&event.data)?;
108 let head_slot = head_block_data.slot;
109
110 if catchup_in_progress {
111 if let Some(Ok(_)) = catchup_sync_rx
112 .as_mut()
113 .and_then(|rx| rx.now_or_never())
114 {
115 sse_synchronizer
116 .set_checkpoint(Some(CheckpointType::Upper));
117 catchup_in_progress = false;
118 }
119 }
120
121 if is_first_event {
122 if let Some(last_synced_slot) = last_synced_slot {
123 if last_synced_slot < head_slot - 1 {
124 let (channel_tx, channel_rx) =
125 oneshot::channel::<TaskResult>();
126
127 let catchup_task = IndexingTask::new(
128 "catchup",
129 context.clone(),
130 Some(info_span!(parent: None, "catchup"))
131 );
132
133 catchup_task.run(IndexingRunParams {
134 error_report_tx: error_report_tx.clone(),
135 result_report_tx: Some(channel_tx),
136 from_block_id: (last_synced_slot + 1)
137 .into(),
138 to_block_id: head_slot.into(),
139 prev_block: last_synced_block.clone(),
140 checkpoint: Some(CheckpointType::Upper),
141 });
142
143 catchup_in_progress = true;
144 catchup_sync_rx = Some(channel_rx);
145
146 sse_synchronizer.set_checkpoint(None);
147 sse_synchronizer.set_last_synced_block(None);
148 }
149 }
150 }
151
152 sse_synchronizer
153 .sync_block(head_slot.into())
154 .instrument(head_event_span.clone())
155 .await
156 .map_err(|err| {
157 SSEIndexingError::EventHandlingError {
158 event: event.event.clone(),
159 error: err.into(),
160 }
161 })?;
162
163 is_first_event = false;
164 }
165 "finalized_checkpoint" => {
166 async {
167 let finalized_checkpoint_data = serde_json::from_str::<
168 FinalizedCheckpointEventData,
169 >(
170 &event.data
171 )?;
172
173 let block_hash = finalized_checkpoint_data.block;
174 let full_block_hash = block_hash.to_full_hex();
175 let last_finalized_block_number = match
176 context
177 .beacon_client()
178 .get_block(block_hash.into())
179 .await
180 .map_err(|err|
181 SSEIndexingError::EventHandlingError { event: event.event.clone(), error: anyhow!(
182 "Failed to retrieve finalized block {full_block_hash}: {err}"
183 ) }
184 )? {
185 Some(block) => match block.execution_payload {
186 Some(execution_payload) => execution_payload.block_number,
187 None => {
188 return Err(
189 SSEIndexingError::EventHandlingError { event: event.event.clone(), error: anyhow!(
190 "Finalized block {full_block_hash} not found"
191 ) },
192 )
193 }
194 },
195 None => {
196 return Err(
197 SSEIndexingError::EventHandlingError { event: event.event.clone(), error: anyhow!(
198 "Finalized block {full_block_hash} not found"
199 ) },
200 )
201 }
202 };
203
204 context
205 .blobscan_client()
206 .update_sync_state(BlockchainSyncState {
207 last_finalized_block: Some(last_finalized_block_number),
208 last_lower_synced_slot: None,
209 last_upper_synced_slot: None,
210 last_upper_synced_block_root: None,
211 last_upper_synced_block_slot: None,
212 })
213 .await
214 .map_err(|err| SSEIndexingError::EventHandlingError {
215 event: event.event,
216 error: err.into(),
217 })?;
218
219 info!(
220 finalized_execution_block = last_finalized_block_number,
221 "Updated last finalized block number"
222 );
223
224 Ok::<_, SSEIndexingError>(())
225 }
226 .instrument(finalized_event_span.clone())
227 .await?;
228 }
229 unexpected_event => {
230 return Err(SSEIndexingError::UnknownEvent(
231 unexpected_event.into(),
232 ));
233 }
234 }
235 }
236 Err(error) => {
237 event_source.close();
238
239 if let reqwest_eventsource::Error::StreamEnded = error {
240 info!("SSE stream ended. Resubscribing to stream…");
241
242 break;
243 } else {
244 return Err(error.into());
245 }
246 }
247 }
248 }
249
250 Ok(())
251 }.instrument(sse_indexing_span.clone())
252 .await;
253
254 if let Err(error) = result {
255 error_report_tx
256 .send(ErrorResport {
257 task_name: "sse-indexing".into(),
258 error: error.into(),
259 })
260 .await
261 .unwrap();
262 }
263 }
264 })
265 }
266}