blob_indexer/synchronizer/
mod.rs

1use std::fmt::Debug;
2
3use anyhow::anyhow;
4use async_trait::async_trait;
5use futures::future::join_all;
6use tokio::task::JoinHandle;
7use tracing::{debug, info, Instrument};
8
9#[cfg(test)]
10use mockall::automock;
11
12use crate::{
13    clients::{
14        beacon::types::{BlockHeader, BlockId, BlockIdResolution},
15        blobscan::types::BlockchainSyncState,
16    },
17    context::CommonContext,
18    slots_processor::{error::SlotsProcessorError, SlotsProcessor},
19};
20
21use self::error::{SlotsChunksErrors, SynchronizerError};
22
23pub mod error;
24
25pub type SynchronizerResult = Result<(), SynchronizerError>;
26
27#[async_trait]
28#[cfg_attr(test, automock)]
29pub trait CommonSynchronizer: Send + Sync {
30    fn set_checkpoint(&mut self, checkpoint: Option<CheckpointType>);
31    fn set_last_synced_block(&mut self, last_synced_block: Option<BlockHeader>);
32    async fn sync_block(&mut self, block_id: BlockId) -> SynchronizerResult;
33    async fn sync_blocks(
34        &mut self,
35        initial_block_id: BlockId,
36        final_block_id: BlockId,
37    ) -> SynchronizerResult;
38}
39
40#[derive(Debug)]
41pub struct SynchronizerBuilder {
42    min_slots_per_thread: u32,
43    checkpoint: Option<CheckpointType>,
44    last_synced_block: Option<BlockHeader>,
45}
46
47pub struct Synchronizer {
48    context: Box<dyn CommonContext>,
49    min_slots_per_thread: u32,
50    checkpoint: Option<CheckpointType>,
51    last_synced_block: Option<BlockHeader>,
52}
53
54#[derive(Clone, Copy, Debug, PartialEq)]
55pub enum CheckpointType {
56    Lower,
57    Upper,
58}
59
60impl Default for SynchronizerBuilder {
61    fn default() -> Self {
62        SynchronizerBuilder {
63            min_slots_per_thread: 50,
64            checkpoint: Some(CheckpointType::Upper),
65            last_synced_block: None,
66        }
67    }
68}
69
70impl SynchronizerBuilder {
71    pub fn new() -> Self {
72        SynchronizerBuilder::default()
73    }
74
75    pub fn with_checkpoint(&mut self, checkpoint: Option<CheckpointType>) -> &mut Self {
76        self.checkpoint = checkpoint;
77
78        self
79    }
80
81    pub fn with_last_synced_block(&mut self, last_synced_block: BlockHeader) -> &mut Self {
82        self.last_synced_block = Some(last_synced_block);
83
84        self
85    }
86
87    pub fn build(&self, context: Box<dyn CommonContext>) -> Synchronizer {
88        Synchronizer {
89            context,
90            min_slots_per_thread: self.min_slots_per_thread,
91            checkpoint: self.checkpoint,
92            last_synced_block: self.last_synced_block.clone(),
93        }
94    }
95}
96
97impl Synchronizer {
98    async fn process_slots(
99        &mut self,
100        from_slot: u32,
101        to_slot: u32,
102    ) -> Result<(), SynchronizerError> {
103        let is_reverse_sync = to_slot < from_slot;
104        let unprocessed_slots = to_slot.abs_diff(from_slot);
105        let min_slots_per_thread = std::cmp::min(unprocessed_slots, self.min_slots_per_thread);
106        let slots_per_thread = std::cmp::max(
107            min_slots_per_thread,
108            unprocessed_slots / self.context.syncing_settings().concurrency,
109        );
110        let num_threads = std::cmp::max(1, unprocessed_slots / slots_per_thread);
111        let remaining_slots = unprocessed_slots % num_threads;
112
113        let mut handles: Vec<JoinHandle<Result<Option<BlockHeader>, SlotsProcessorError>>> = vec![];
114
115        for i in 0..num_threads {
116            let is_first_thread = i == 0;
117            let is_last_thread = i == num_threads - 1;
118            let thread_total_slots =
119                slots_per_thread + if is_last_thread { remaining_slots } else { 0 };
120            let thread_initial_slot = if is_reverse_sync {
121                from_slot - i * slots_per_thread
122            } else {
123                from_slot + i * slots_per_thread
124            };
125            let thread_final_slot = if is_reverse_sync {
126                thread_initial_slot - thread_total_slots
127            } else {
128                thread_initial_slot + thread_total_slots
129            };
130
131            let synchronizer_thread_span = tracing::debug_span!(
132                parent:  &tracing::Span::current(),
133                "thread",
134                thread = i,
135                chunk_initial_slot = thread_initial_slot,
136                chunk_final_slot = thread_final_slot
137            );
138
139            let last_processed_block_header = if is_first_thread {
140                self.last_synced_block.clone()
141            } else {
142                None
143            };
144            let mut slots_processor =
145                SlotsProcessor::new(self.context.clone(), last_processed_block_header);
146
147            let handle = tokio::spawn(
148                async move {
149                    slots_processor
150                        .process_slots(thread_initial_slot, thread_final_slot)
151                        .await?;
152
153                    Ok(slots_processor.last_processed_block)
154                }
155                .instrument(synchronizer_thread_span)
156                .in_current_span(),
157            );
158
159            handles.push(handle);
160        }
161
162        let handle_outputs = join_all(handles).await;
163
164        let mut errors = vec![];
165        let mut last_thread_block: Option<BlockHeader> = None;
166
167        for handle in handle_outputs {
168            match handle {
169                Ok(thread_result) => match thread_result {
170                    Ok(thread_block_header) => {
171                        if let Some(block_header) = thread_block_header {
172                            last_thread_block = Some(block_header);
173                        }
174                    }
175                    Err(error) => errors.push(error),
176                },
177                Err(error) => {
178                    let err = anyhow!("Synchronizer thread panicked: {:?}", error);
179
180                    errors.push(err.into());
181                }
182            }
183        }
184
185        if !errors.is_empty() {
186            return Err(SynchronizerError::FailedParallelSlotsProcessing {
187                initial_slot: from_slot,
188                final_slot: to_slot,
189                chunk_errors: SlotsChunksErrors(errors),
190            });
191        }
192
193        if let Some(last_thread_block) = last_thread_block {
194            self.last_synced_block = Some(last_thread_block);
195        }
196
197        Ok(())
198    }
199
200    async fn process_slots_by_checkpoints(
201        &mut self,
202        initial_slot: u32,
203        final_slot: u32,
204    ) -> Result<(), SynchronizerError> {
205        let is_reverse_sync = final_slot < initial_slot;
206        let mut current_slot = initial_slot;
207        let mut unprocessed_slots = final_slot.abs_diff(current_slot);
208
209        if unprocessed_slots == 1 {
210            info!(slot = initial_slot, "Syncing {unprocessed_slots} slot…");
211        } else {
212            info!(
213                initial_slot,
214                final_slot, "Syncing {unprocessed_slots} slots…"
215            );
216        }
217
218        while unprocessed_slots > 0 {
219            let checkpoint_size = self.context.syncing_settings().checkpoint_size;
220            let slots_chunk = std::cmp::min(unprocessed_slots, checkpoint_size);
221            let initial_chunk_slot = current_slot;
222            let final_chunk_slot = if is_reverse_sync {
223                current_slot - slots_chunk
224            } else {
225                current_slot + slots_chunk
226            };
227
228            let sync_slots_chunk_span = tracing::debug_span!(
229                parent: &tracing::Span::current(),
230                "checkpoint",
231                checkpoint_initial_slot = initial_chunk_slot,
232                checkpoint_final_slot = final_chunk_slot
233            );
234
235            self.process_slots(initial_chunk_slot, final_chunk_slot)
236                .instrument(sync_slots_chunk_span)
237                .await?;
238
239            let last_slot = Some(if is_reverse_sync {
240                final_chunk_slot + 1
241            } else {
242                final_chunk_slot - 1
243            });
244
245            let checkpointing_enabled = !self.context.syncing_settings().disable_checkpoints;
246
247            if checkpointing_enabled {
248                if let Some(checkpoint) = self.checkpoint {
249                    let mut last_lower_synced_slot = None;
250                    let mut last_upper_synced_slot = None;
251                    let mut last_upper_synced_block_root = None;
252                    let mut last_upper_synced_block_slot = None;
253
254                    if checkpoint == CheckpointType::Lower {
255                        last_lower_synced_slot = last_slot;
256                    } else if checkpoint == CheckpointType::Upper {
257                        last_upper_synced_slot = last_slot;
258                        last_upper_synced_block_root =
259                            self.last_synced_block.as_ref().map(|block| block.root);
260                        last_upper_synced_block_slot =
261                            self.last_synced_block.as_ref().map(|block| block.slot);
262                    }
263
264                    if let Err(error) = self
265                        .context
266                        .blobscan_client()
267                        .update_sync_state(BlockchainSyncState {
268                            last_finalized_block: None,
269                            last_lower_synced_slot,
270                            last_upper_synced_slot,
271                            last_upper_synced_block_root,
272                            last_upper_synced_block_slot,
273                        })
274                        .await
275                    {
276                        let new_synced_slot = match last_lower_synced_slot.or(last_upper_synced_slot) {
277                                Some(slot) => slot,
278                                None => return Err(SynchronizerError::Other(anyhow!(
279                                    "Failed to get new last synced slot: last_lower_synced_slot and last_upper_synced_slot are both None"
280                                )))
281                            };
282
283                        return Err(SynchronizerError::FailedSlotCheckpointSave {
284                            slot: new_synced_slot,
285                            error,
286                        });
287                    }
288
289                    if unprocessed_slots >= checkpoint_size {
290                        debug!(
291                            new_last_lower_synced_slot = last_lower_synced_slot,
292                            new_last_upper_synced_slot = last_upper_synced_slot,
293                            "Checkpoint reached. Last synced slot saved…"
294                        );
295                    }
296                }
297            }
298
299            current_slot = if is_reverse_sync {
300                current_slot - slots_chunk
301            } else {
302                current_slot + slots_chunk
303            };
304
305            unprocessed_slots -= slots_chunk;
306        }
307
308        Ok(())
309    }
310}
311
312#[async_trait]
313impl CommonSynchronizer for Synchronizer {
314    fn set_checkpoint(&mut self, checkpoint: Option<CheckpointType>) {
315        self.checkpoint = checkpoint;
316    }
317
318    fn set_last_synced_block(&mut self, last_synced_block: Option<BlockHeader>) {
319        self.last_synced_block = last_synced_block;
320    }
321
322    async fn sync_block(&mut self, block_id: BlockId) -> SynchronizerResult {
323        let final_slot = block_id
324            .resolve_to_slot(self.context.beacon_client())
325            .await?;
326
327        self.process_slots_by_checkpoints(final_slot, final_slot + 1)
328            .await?;
329
330        Ok(())
331    }
332
333    async fn sync_blocks(
334        &mut self,
335        initial_block_id: BlockId,
336        final_block_id: BlockId,
337    ) -> SynchronizerResult {
338        let initial_slot = initial_block_id
339            .resolve_to_slot(self.context.beacon_client())
340            .await?;
341        let mut final_slot = final_block_id
342            .resolve_to_slot(self.context.beacon_client())
343            .await?;
344
345        if initial_slot == final_slot {
346            return Ok(());
347        }
348
349        loop {
350            self.process_slots_by_checkpoints(initial_slot, final_slot)
351                .await?;
352
353            let latest_final_slot = final_block_id
354                .resolve_to_slot(self.context.beacon_client())
355                .await?;
356
357            if final_slot == latest_final_slot {
358                return Ok(());
359            }
360
361            final_slot = latest_final_slot;
362        }
363    }
364}