blob_indexer/indexer/tasks/
indexing.rs

1use tracing::{debug, Instrument, Span};
2
3use crate::{
4    clients::beacon::types::{BlockHeader, BlockId},
5    context::CommonContext,
6    indexer::types::{
7        ErrorResport, IndexingTaskJoinHandle, TaskErrorChannelSender, TaskResultChannelSender,
8    },
9    synchronizer::{CheckpointType, CommonSynchronizer, SynchronizerBuilder},
10};
11
12pub struct IndexingTask {
13    context: Box<dyn CommonContext>,
14    name: String,
15    span: Option<Span>,
16}
17
18pub struct RunParams {
19    pub error_report_tx: TaskErrorChannelSender,
20    pub result_report_tx: Option<TaskResultChannelSender>,
21    pub from_block_id: BlockId,
22    pub to_block_id: BlockId,
23    pub prev_block: Option<BlockHeader>,
24    pub checkpoint: Option<CheckpointType>,
25}
26
27impl IndexingTask {
28    pub fn new(name: &str, context: Box<dyn CommonContext>, span: Option<Span>) -> Self {
29        IndexingTask {
30            context,
31            name: name.into(),
32            span,
33        }
34    }
35
36    pub fn run(&self, params: RunParams) -> IndexingTaskJoinHandle {
37        let context = self.context.clone();
38        let name = self.name.clone();
39        let span = self.span.clone();
40
41        tokio::spawn(async move {
42            let RunParams {
43                error_report_tx,
44                result_report_tx,
45                from_block_id,
46                prev_block,
47                checkpoint,
48                to_block_id,
49            } = params;
50            let mut synchronizer_builder = SynchronizerBuilder::new();
51
52            if let Some(prev_block) = prev_block {
53                synchronizer_builder.with_last_synced_block(prev_block);
54            }
55
56            synchronizer_builder.with_checkpoint(checkpoint);
57
58            let mut synchronizer = synchronizer_builder.build(context);
59
60            let indexing_task_span = span.unwrap_or(tracing::info_span!("indexing-task"));
61
62            async {
63                let result = if from_block_id == to_block_id {
64                    synchronizer.sync_block(from_block_id).await
65                } else {
66                    synchronizer.sync_blocks(from_block_id, to_block_id).await
67                };
68
69                match result {
70                    Ok(sync_result) => {
71                        debug!("Task {name} completed!");
72
73                        if let Some(report_tx) = result_report_tx {
74                            report_tx.send(sync_result).unwrap();
75                        }
76                    }
77                    Err(sync_error) => {
78                        error_report_tx
79                            .send(ErrorResport {
80                                task_name: name,
81                                error: sync_error.into(),
82                            })
83                            .await
84                            .unwrap();
85                    }
86                }
87            }
88            .instrument(indexing_task_span)
89            .await;
90        })
91    }
92}