blob_indexer/indexer/tasks/
sse_indexing.rs

1use anyhow::anyhow;
2use futures::{FutureExt, StreamExt};
3use reqwest_eventsource::Event;
4use tokio::sync::oneshot;
5use tracing::{debug, info, info_span, Instrument};
6
7use crate::{
8    clients::{
9        beacon::types::{BlockHeader, FinalizedCheckpointEventData, HeadEventData, Topic},
10        blobscan::types::BlockchainSyncState,
11        common::ClientError,
12    },
13    context::CommonContext,
14    indexer::{
15        tasks::indexing::{IndexingTask, RunParams as IndexingRunParams},
16        types::{
17            ErrorResport, IndexingTaskJoinHandle, TaskErrorChannelSender, TaskResult,
18            TaskResultChannelReceiver,
19        },
20    },
21    synchronizer::{CheckpointType, CommonSynchronizer, SynchronizerBuilder},
22    utils::alloy::B256Ext,
23};
24
25#[derive(Debug, thiserror::Error)]
26pub enum SSEIndexingError {
27    #[error("an error ocurred while receiving events from the SSE stream")]
28    ConnectionFailure(#[from] reqwest_eventsource::Error),
29    #[error("failed to subscribe to SSE stream")]
30    FailedSubscription(#[source] ClientError),
31    #[error("unexpected event \"{0}\" received")]
32    UnknownEvent(String),
33    #[error(transparent)]
34    EventDeserializationFailure(#[from] serde_json::Error),
35    #[error("failed to handle event \"{event}\": {error}")]
36    EventHandlingError { event: String, error: anyhow::Error },
37}
38
39pub struct RunParams {
40    pub last_synced_slot: Option<u32>,
41    pub last_synced_block: Option<BlockHeader>,
42}
43
44pub struct SSEIndexingTask {
45    context: Box<dyn CommonContext>,
46    error_report_tx: TaskErrorChannelSender,
47}
48
49impl SSEIndexingTask {
50    pub fn new(context: Box<dyn CommonContext>, error_report_tx: TaskErrorChannelSender) -> Self {
51        SSEIndexingTask {
52            context,
53            error_report_tx,
54        }
55    }
56
57    pub fn run(&self, params: RunParams) -> IndexingTaskJoinHandle {
58        let context = self.context.clone();
59        let error_report_tx = self.error_report_tx.clone();
60        let last_synced_block = params.last_synced_block;
61        let last_synced_slot = params.last_synced_slot;
62
63        tokio::spawn(async move {
64            let mut sse_synchronizer_builder = SynchronizerBuilder::default();
65
66            if let Some(prev_block) = last_synced_block.clone() {
67                sse_synchronizer_builder.with_last_synced_block(prev_block);
68            }
69
70            let mut sse_synchronizer = sse_synchronizer_builder.build(context.clone());
71
72            let topics = vec![Topic::Head, Topic::FinalizedCheckpoint];
73            let events = topics
74                .iter()
75                .map(|topic| topic.into())
76                .collect::<Vec<String>>()
77                .join(", ");
78            let sse_indexing_span = info_span!("sse-indexing");
79
80            loop {
81                let result: Result<(), SSEIndexingError> = async {
82                    let mut event_source = context
83                        .beacon_client()
84                        .subscribe_to_events(&topics)
85                        .map_err(SSEIndexingError::FailedSubscription)?;
86
87                    info!("Subscribed to stream events: {}", events);
88
89                    let mut catchup_sync_rx: Option<TaskResultChannelReceiver> = None;
90                    let mut is_first_event = true;
91                    let mut catchup_in_progress = false;
92                    let head_event_span = info_span!("head");
93                    let finalized_event_span =
94                        info_span!("finalized_checkpoint");
95
96                    while let Some(event) = event_source.next().await {
97                        match event {
98                            Ok(Event::Open) => {
99                                debug!("Subscrption connection opened")
100                            }
101                            Ok(Event::Message(event)) => {
102                                let event_name = event.event.as_str();
103
104                                match event_name {
105                                    "head" => {
106                                        let head_block_data =
107                                            serde_json::from_str::<HeadEventData>(&event.data)?;
108                                        let head_slot = head_block_data.slot;
109
110                                        if catchup_in_progress {
111                                            if let Some(Ok(_)) = catchup_sync_rx
112                                                .as_mut()
113                                                .and_then(|rx| rx.now_or_never())
114                                            {
115                                                sse_synchronizer
116                                                    .set_checkpoint(Some(CheckpointType::Upper));
117                                                catchup_in_progress = false;
118                                            }
119                                        }
120
121                                        if is_first_event {
122                                            if let Some(last_synced_slot) = last_synced_slot {
123                                                if last_synced_slot < head_slot - 1 {
124                                                    let (channel_tx, channel_rx) =
125                                                        oneshot::channel::<TaskResult>();
126
127                                                    let catchup_task = IndexingTask::new(
128                                                        "catchup",
129                                                        context.clone(),
130                                                        Some(info_span!(parent: None, "catchup"))
131                                                    );
132
133                                                    catchup_task.run(IndexingRunParams {
134                                                        error_report_tx: error_report_tx.clone(),
135                                                        result_report_tx: Some(channel_tx),
136                                                        from_block_id: (last_synced_slot + 1)
137                                                            .into(),
138                                                        to_block_id: head_slot.into(),
139                                                        prev_block: last_synced_block.clone(),
140                                                        checkpoint: Some(CheckpointType::Upper),
141                                                    });
142
143                                                    catchup_in_progress = true;
144                                                    catchup_sync_rx = Some(channel_rx);
145
146                                                    sse_synchronizer.set_checkpoint(None);
147                                                    sse_synchronizer.set_last_synced_block(None);
148                                                }
149                                            }
150                                        }
151
152                                        sse_synchronizer
153                                            .sync_block(head_slot.into())
154                                            .instrument(head_event_span.clone())
155                                            .await
156                                            .map_err(|err| {
157                                                SSEIndexingError::EventHandlingError {
158                                                    event: event.event.clone(),
159                                                    error: err.into(),
160                                                }
161                                            })?;
162
163                                        is_first_event = false;
164                                    }
165                                    "finalized_checkpoint" => {
166                                        async {
167                                            let finalized_checkpoint_data = serde_json::from_str::<
168                                                FinalizedCheckpointEventData,
169                                            >(
170                                                &event.data
171                                            )?;
172
173                                             let block_hash = finalized_checkpoint_data.block;
174                                        let full_block_hash = block_hash.to_full_hex();
175                                        let last_finalized_block_number = match
176                                            context
177                                            .beacon_client()
178                                            .get_block(block_hash.into())
179                                            .await
180                                            .map_err(|err|
181                                                SSEIndexingError::EventHandlingError { event: event.event.clone(), error: anyhow!(
182                                                    "Failed to retrieve finalized block {full_block_hash}: {err}"
183                                                ) }
184                                            )? {
185                                            Some(block) => match block.execution_payload {
186                                                Some(execution_payload) => execution_payload.block_number,
187                                                None => {
188                                                    return Err(
189                                                        SSEIndexingError::EventHandlingError { event: event.event.clone(), error: anyhow!(
190                                                    "Finalized block {full_block_hash} not found"
191                                                ) },
192                                                    )
193                                                }
194                                            },
195                                            None => {
196                                                return Err(
197                                                    SSEIndexingError::EventHandlingError { event: event.event.clone(), error: anyhow!(
198                                                    "Finalized block {full_block_hash} not found"
199                                                ) },
200                                                )
201                                            }
202                                        };
203
204                                        context
205                                            .blobscan_client()
206                                            .update_sync_state(BlockchainSyncState {
207                                                last_finalized_block: Some(last_finalized_block_number),
208                                                last_lower_synced_slot: None,
209                                                last_upper_synced_slot: None,
210                                                last_upper_synced_block_root: None,
211                                                last_upper_synced_block_slot: None,
212                                            })
213                                            .await
214                                            .map_err(|err| SSEIndexingError::EventHandlingError {
215                                                event: event.event,
216                                                error: err.into(),
217                                            })?;
218
219                                        info!(
220                                            finalized_execution_block = last_finalized_block_number,
221                                            "Updated last finalized block number"
222                                        );
223
224                                            Ok::<_, SSEIndexingError>(())
225                                        }
226                                        .instrument(finalized_event_span.clone())
227                                        .await?;
228                                    }
229                                    unexpected_event => {
230                                        return Err(SSEIndexingError::UnknownEvent(
231                                            unexpected_event.into(),
232                                        ));
233                                    }
234                                }
235                            }
236                            Err(error) => {
237                                event_source.close();
238
239                                if let reqwest_eventsource::Error::StreamEnded = error {
240                                    info!("SSE stream ended. Resubscribing to stream…");
241
242                                    break;
243                                } else {
244                                    return Err(error.into());
245                                }
246                            }
247                        }
248                    }
249
250                    Ok(())
251                }.instrument(sse_indexing_span.clone())
252                .await;
253
254                if let Err(error) = result {
255                    error_report_tx
256                        .send(ErrorResport {
257                            task_name: "sse-indexing".into(),
258                            error: error.into(),
259                        })
260                        .await
261                        .unwrap();
262                }
263            }
264        })
265    }
266}