blob_indexer/synchronizer/
mod.rs1use std::fmt::Debug;
2
3use anyhow::anyhow;
4use async_trait::async_trait;
5use futures::future::join_all;
6use tokio::task::JoinHandle;
7use tracing::{debug, info, Instrument};
8
9#[cfg(test)]
10use mockall::automock;
11
12use crate::{
13 clients::{
14 beacon::types::{BlockHeader, BlockId, BlockIdResolution},
15 blobscan::types::BlockchainSyncState,
16 },
17 context::CommonContext,
18 slots_processor::{error::SlotsProcessorError, SlotsProcessor},
19};
20
21use self::error::{SlotsChunksErrors, SynchronizerError};
22
23pub mod error;
24
25pub type SynchronizerResult = Result<(), SynchronizerError>;
26
27#[async_trait]
28#[cfg_attr(test, automock)]
29pub trait CommonSynchronizer: Send + Sync {
30 fn set_checkpoint(&mut self, checkpoint: Option<CheckpointType>);
31 fn set_last_synced_block(&mut self, last_synced_block: Option<BlockHeader>);
32 async fn sync_block(&mut self, block_id: BlockId) -> SynchronizerResult;
33 async fn sync_blocks(
34 &mut self,
35 initial_block_id: BlockId,
36 final_block_id: BlockId,
37 ) -> SynchronizerResult;
38}
39
40#[derive(Debug)]
41pub struct SynchronizerBuilder {
42 min_slots_per_thread: u32,
43 checkpoint: Option<CheckpointType>,
44 last_synced_block: Option<BlockHeader>,
45}
46
47pub struct Synchronizer {
48 context: Box<dyn CommonContext>,
49 min_slots_per_thread: u32,
50 checkpoint: Option<CheckpointType>,
51 last_synced_block: Option<BlockHeader>,
52}
53
54#[derive(Clone, Copy, Debug, PartialEq)]
55pub enum CheckpointType {
56 Lower,
57 Upper,
58}
59
60impl Default for SynchronizerBuilder {
61 fn default() -> Self {
62 SynchronizerBuilder {
63 min_slots_per_thread: 50,
64 checkpoint: Some(CheckpointType::Upper),
65 last_synced_block: None,
66 }
67 }
68}
69
70impl SynchronizerBuilder {
71 pub fn new() -> Self {
72 SynchronizerBuilder::default()
73 }
74
75 pub fn with_checkpoint(&mut self, checkpoint: Option<CheckpointType>) -> &mut Self {
76 self.checkpoint = checkpoint;
77
78 self
79 }
80
81 pub fn with_last_synced_block(&mut self, last_synced_block: BlockHeader) -> &mut Self {
82 self.last_synced_block = Some(last_synced_block);
83
84 self
85 }
86
87 pub fn build(&self, context: Box<dyn CommonContext>) -> Synchronizer {
88 Synchronizer {
89 context,
90 min_slots_per_thread: self.min_slots_per_thread,
91 checkpoint: self.checkpoint,
92 last_synced_block: self.last_synced_block.clone(),
93 }
94 }
95}
96
97impl Synchronizer {
98 async fn process_slots(
99 &mut self,
100 from_slot: u32,
101 to_slot: u32,
102 ) -> Result<(), SynchronizerError> {
103 let is_reverse_sync = to_slot < from_slot;
104 let unprocessed_slots = to_slot.abs_diff(from_slot);
105 let min_slots_per_thread = std::cmp::min(unprocessed_slots, self.min_slots_per_thread);
106 let slots_per_thread = std::cmp::max(
107 min_slots_per_thread,
108 unprocessed_slots / self.context.syncing_settings().concurrency,
109 );
110 let num_threads = std::cmp::max(1, unprocessed_slots / slots_per_thread);
111 let remaining_slots = unprocessed_slots % num_threads;
112
113 let mut handles: Vec<JoinHandle<Result<Option<BlockHeader>, SlotsProcessorError>>> = vec![];
114
115 for i in 0..num_threads {
116 let is_first_thread = i == 0;
117 let is_last_thread = i == num_threads - 1;
118 let thread_total_slots =
119 slots_per_thread + if is_last_thread { remaining_slots } else { 0 };
120 let thread_initial_slot = if is_reverse_sync {
121 from_slot - i * slots_per_thread
122 } else {
123 from_slot + i * slots_per_thread
124 };
125 let thread_final_slot = if is_reverse_sync {
126 thread_initial_slot - thread_total_slots
127 } else {
128 thread_initial_slot + thread_total_slots
129 };
130
131 let synchronizer_thread_span = tracing::debug_span!(
132 parent: &tracing::Span::current(),
133 "thread",
134 thread = i,
135 chunk_initial_slot = thread_initial_slot,
136 chunk_final_slot = thread_final_slot
137 );
138
139 let last_processed_block_header = if is_first_thread {
140 self.last_synced_block.clone()
141 } else {
142 None
143 };
144 let mut slots_processor =
145 SlotsProcessor::new(self.context.clone(), last_processed_block_header);
146
147 let handle = tokio::spawn(
148 async move {
149 slots_processor
150 .process_slots(thread_initial_slot, thread_final_slot)
151 .await?;
152
153 Ok(slots_processor.last_processed_block)
154 }
155 .instrument(synchronizer_thread_span)
156 .in_current_span(),
157 );
158
159 handles.push(handle);
160 }
161
162 let handle_outputs = join_all(handles).await;
163
164 let mut errors = vec![];
165 let mut last_thread_block: Option<BlockHeader> = None;
166
167 for handle in handle_outputs {
168 match handle {
169 Ok(thread_result) => match thread_result {
170 Ok(thread_block_header) => {
171 if let Some(block_header) = thread_block_header {
172 last_thread_block = Some(block_header);
173 }
174 }
175 Err(error) => errors.push(error),
176 },
177 Err(error) => {
178 let err = anyhow!("Synchronizer thread panicked: {:?}", error);
179
180 errors.push(err.into());
181 }
182 }
183 }
184
185 if !errors.is_empty() {
186 return Err(SynchronizerError::FailedParallelSlotsProcessing {
187 initial_slot: from_slot,
188 final_slot: to_slot,
189 chunk_errors: SlotsChunksErrors(errors),
190 });
191 }
192
193 if let Some(last_thread_block) = last_thread_block {
194 self.last_synced_block = Some(last_thread_block);
195 }
196
197 Ok(())
198 }
199
200 async fn process_slots_by_checkpoints(
201 &mut self,
202 initial_slot: u32,
203 final_slot: u32,
204 ) -> Result<(), SynchronizerError> {
205 let is_reverse_sync = final_slot < initial_slot;
206 let mut current_slot = initial_slot;
207 let mut unprocessed_slots = final_slot.abs_diff(current_slot);
208
209 if unprocessed_slots == 1 {
210 info!(slot = initial_slot, "Syncing {unprocessed_slots} slot…");
211 } else {
212 info!(
213 initial_slot,
214 final_slot, "Syncing {unprocessed_slots} slots…"
215 );
216 }
217
218 while unprocessed_slots > 0 {
219 let checkpoint_size = self.context.syncing_settings().checkpoint_size;
220 let slots_chunk = std::cmp::min(unprocessed_slots, checkpoint_size);
221 let initial_chunk_slot = current_slot;
222 let final_chunk_slot = if is_reverse_sync {
223 current_slot - slots_chunk
224 } else {
225 current_slot + slots_chunk
226 };
227
228 let sync_slots_chunk_span = tracing::debug_span!(
229 parent: &tracing::Span::current(),
230 "checkpoint",
231 checkpoint_initial_slot = initial_chunk_slot,
232 checkpoint_final_slot = final_chunk_slot
233 );
234
235 self.process_slots(initial_chunk_slot, final_chunk_slot)
236 .instrument(sync_slots_chunk_span)
237 .await?;
238
239 let last_slot = Some(if is_reverse_sync {
240 final_chunk_slot + 1
241 } else {
242 final_chunk_slot - 1
243 });
244
245 let checkpointing_enabled = !self.context.syncing_settings().disable_checkpoints;
246
247 if checkpointing_enabled {
248 if let Some(checkpoint) = self.checkpoint {
249 let mut last_lower_synced_slot = None;
250 let mut last_upper_synced_slot = None;
251 let mut last_upper_synced_block_root = None;
252 let mut last_upper_synced_block_slot = None;
253
254 if checkpoint == CheckpointType::Lower {
255 last_lower_synced_slot = last_slot;
256 } else if checkpoint == CheckpointType::Upper {
257 last_upper_synced_slot = last_slot;
258 last_upper_synced_block_root =
259 self.last_synced_block.as_ref().map(|block| block.root);
260 last_upper_synced_block_slot =
261 self.last_synced_block.as_ref().map(|block| block.slot);
262 }
263
264 if let Err(error) = self
265 .context
266 .blobscan_client()
267 .update_sync_state(BlockchainSyncState {
268 last_finalized_block: None,
269 last_lower_synced_slot,
270 last_upper_synced_slot,
271 last_upper_synced_block_root,
272 last_upper_synced_block_slot,
273 })
274 .await
275 {
276 let new_synced_slot = match last_lower_synced_slot.or(last_upper_synced_slot) {
277 Some(slot) => slot,
278 None => return Err(SynchronizerError::Other(anyhow!(
279 "Failed to get new last synced slot: last_lower_synced_slot and last_upper_synced_slot are both None"
280 )))
281 };
282
283 return Err(SynchronizerError::FailedSlotCheckpointSave {
284 slot: new_synced_slot,
285 error,
286 });
287 }
288
289 if unprocessed_slots >= checkpoint_size {
290 debug!(
291 new_last_lower_synced_slot = last_lower_synced_slot,
292 new_last_upper_synced_slot = last_upper_synced_slot,
293 "Checkpoint reached. Last synced slot saved…"
294 );
295 }
296 }
297 }
298
299 current_slot = if is_reverse_sync {
300 current_slot - slots_chunk
301 } else {
302 current_slot + slots_chunk
303 };
304
305 unprocessed_slots -= slots_chunk;
306 }
307
308 Ok(())
309 }
310}
311
312#[async_trait]
313impl CommonSynchronizer for Synchronizer {
314 fn set_checkpoint(&mut self, checkpoint: Option<CheckpointType>) {
315 self.checkpoint = checkpoint;
316 }
317
318 fn set_last_synced_block(&mut self, last_synced_block: Option<BlockHeader>) {
319 self.last_synced_block = last_synced_block;
320 }
321
322 async fn sync_block(&mut self, block_id: BlockId) -> SynchronizerResult {
323 let final_slot = block_id
324 .resolve_to_slot(self.context.beacon_client())
325 .await?;
326
327 self.process_slots_by_checkpoints(final_slot, final_slot + 1)
328 .await?;
329
330 Ok(())
331 }
332
333 async fn sync_blocks(
334 &mut self,
335 initial_block_id: BlockId,
336 final_block_id: BlockId,
337 ) -> SynchronizerResult {
338 let initial_slot = initial_block_id
339 .resolve_to_slot(self.context.beacon_client())
340 .await?;
341 let mut final_slot = final_block_id
342 .resolve_to_slot(self.context.beacon_client())
343 .await?;
344
345 if initial_slot == final_slot {
346 return Ok(());
347 }
348
349 loop {
350 self.process_slots_by_checkpoints(initial_slot, final_slot)
351 .await?;
352
353 let latest_final_slot = final_block_id
354 .resolve_to_slot(self.context.beacon_client())
355 .await?;
356
357 if final_slot == latest_final_slot {
358 return Ok(());
359 }
360
361 final_slot = latest_final_slot;
362 }
363 }
364}