blob_indexer/indexer/tasks/
indexing.rs1use tracing::{debug, Instrument, Span};
2
3use crate::{
4 clients::beacon::types::{BlockHeader, BlockId},
5 context::CommonContext,
6 indexer::types::{
7 ErrorResport, IndexingTaskJoinHandle, TaskErrorChannelSender, TaskResultChannelSender,
8 },
9 synchronizer::{CheckpointType, CommonSynchronizer, SynchronizerBuilder},
10};
11
12pub struct IndexingTask {
13 context: Box<dyn CommonContext>,
14 name: String,
15 span: Option<Span>,
16}
17
18pub struct RunParams {
19 pub error_report_tx: TaskErrorChannelSender,
20 pub result_report_tx: Option<TaskResultChannelSender>,
21 pub from_block_id: BlockId,
22 pub to_block_id: BlockId,
23 pub prev_block: Option<BlockHeader>,
24 pub checkpoint: Option<CheckpointType>,
25}
26
27impl IndexingTask {
28 pub fn new(name: &str, context: Box<dyn CommonContext>, span: Option<Span>) -> Self {
29 IndexingTask {
30 context,
31 name: name.into(),
32 span,
33 }
34 }
35
36 pub fn run(&self, params: RunParams) -> IndexingTaskJoinHandle {
37 let context = self.context.clone();
38 let name = self.name.clone();
39 let span = self.span.clone();
40
41 tokio::spawn(async move {
42 let RunParams {
43 error_report_tx,
44 result_report_tx,
45 from_block_id,
46 prev_block,
47 checkpoint,
48 to_block_id,
49 } = params;
50 let mut synchronizer_builder = SynchronizerBuilder::new();
51
52 if let Some(prev_block) = prev_block {
53 synchronizer_builder.with_last_synced_block(prev_block);
54 }
55
56 synchronizer_builder.with_checkpoint(checkpoint);
57
58 let mut synchronizer = synchronizer_builder.build(context);
59
60 let indexing_task_span = span.unwrap_or(tracing::info_span!("indexing-task"));
61
62 async {
63 let result = if from_block_id == to_block_id {
64 synchronizer.sync_block(from_block_id).await
65 } else {
66 synchronizer.sync_blocks(from_block_id, to_block_id).await
67 };
68
69 match result {
70 Ok(sync_result) => {
71 debug!("Task {name} completed!");
72
73 if let Some(report_tx) = result_report_tx {
74 report_tx.send(sync_result).unwrap();
75 }
76 }
77 Err(sync_error) => {
78 error_report_tx
79 .send(ErrorResport {
80 task_name: name,
81 error: sync_error.into(),
82 })
83 .await
84 .unwrap();
85 }
86 }
87 }
88 .instrument(indexing_task_span)
89 .await;
90 })
91 }
92}