Skip to main content

ethrex_p2p/sync/
snap_sync.rs

1//! Snap sync implementation
2//!
3//! This module contains the logic for snap synchronization mode where state is
4//! fetched via snap p2p requests while blocks and receipts are fetched in parallel.
5
6use std::collections::{BTreeSet, HashMap, HashSet};
7use std::path::Path;
8#[cfg(feature = "rocksdb")]
9use std::path::PathBuf;
10use std::sync::Arc;
11use std::sync::atomic::Ordering;
12use std::time::{Duration, SystemTime};
13
14use ethrex_blockchain::Blockchain;
15use ethrex_common::types::{AccountState, BlockHeader, Code};
16use ethrex_common::{
17    H256,
18    constants::{EMPTY_KECCAK_HASH, EMPTY_TRIE_HASH},
19};
20use ethrex_rlp::decode::RLPDecode;
21use ethrex_storage::Store;
22#[cfg(feature = "rocksdb")]
23use ethrex_trie::Trie;
24use rayon::iter::{ParallelBridge, ParallelIterator};
25use tracing::{debug, error, info, warn};
26
27use crate::metrics::{CurrentStepValue, METRICS};
28use crate::peer_handler::PeerHandler;
29use crate::peer_table::PeerTableServerProtocol as _;
30use crate::rlpx::p2p::SUPPORTED_ETH_CAPABILITIES;
31use crate::snap::{
32    async_fs,
33    constants::{
34        BYTECODE_CHUNK_SIZE, MAX_HEADER_FETCH_ATTEMPTS, MIN_FULL_BLOCKS, MISSING_SLOTS_PERCENTAGE,
35        SECONDS_PER_BLOCK, SNAP_LIMIT,
36    },
37    request_account_range, request_bytecodes, request_storage_ranges,
38};
39use crate::sync::code_collector::CodeHashCollector;
40use crate::sync::healing::{heal_state_trie_wrap, heal_storage_trie};
41use crate::utils::{
42    current_unix_time, get_account_state_snapshots_dir, get_account_storages_snapshots_dir,
43    get_code_hashes_snapshots_dir,
44};
45
46use super::{AccountStorageRoots, SyncError};
47
48#[cfg(not(feature = "rocksdb"))]
49use ethrex_common::U256;
50#[cfg(not(feature = "rocksdb"))]
51use ethrex_rlp::encode::RLPEncode;
52
53/// Persisted State during the Block Sync phase for SnapSync
54#[derive(Clone)]
55pub struct SnapBlockSyncState {
56    pub block_hashes: Vec<H256>,
57    store: Store,
58}
59
60impl SnapBlockSyncState {
61    pub fn new(store: Store) -> Self {
62        Self {
63            block_hashes: Vec::new(),
64            store,
65        }
66    }
67
68    /// Obtain the current head from where to start or resume block sync
69    pub async fn get_current_head(&self) -> Result<H256, SyncError> {
70        if let Some(head) = self.store.get_header_download_checkpoint().await? {
71            // A checkpoint pointing at a header we never stored (a crash
72            // between the checkpoint write and the header write) would make
73            // every resume fail with the same recoverable error, retrying
74            // forever. Restart header download from the canonical head
75            // instead; the checkpoint is overwritten by the first stored
76            // batch.
77            if self.store.get_block_number(head).await?.is_some() {
78                return Ok(head);
79            }
80            warn!(
81                checkpoint = %head,
82                "Header download checkpoint has no stored header; restarting header download from the canonical head"
83            );
84        }
85        self.store
86            .get_latest_canonical_block_hash()
87            .await?
88            .ok_or(SyncError::NoLatestCanonical)
89    }
90
91    /// Stores incoming headers to the Store and saves their hashes
92    pub async fn process_incoming_headers(
93        &mut self,
94        block_headers: impl Iterator<Item = BlockHeader>,
95    ) -> Result<(), SyncError> {
96        let mut block_headers_vec = Vec::with_capacity(block_headers.size_hint().1.unwrap_or(0));
97        let mut block_hashes = Vec::with_capacity(block_headers.size_hint().1.unwrap_or(0));
98        for header in block_headers {
99            block_hashes.push(header.hash());
100            block_headers_vec.push(header);
101        }
102        let checkpoint = *block_hashes.last().ok_or(SyncError::InvalidRangeReceived)?;
103        // Store the headers before the checkpoint that references them: a
104        // crash after the headers commit only re-downloads a suffix, while a
105        // crash after a checkpoint-first write leaves a dangling checkpoint.
106        self.store.add_block_headers(block_headers_vec).await?;
107        self.store
108            .set_header_download_checkpoint(checkpoint)
109            .await?;
110        self.block_hashes.extend_from_slice(&block_hashes);
111        Ok(())
112    }
113}
114
115/// Performs snap sync cycle - fetches state via snap protocol while downloading blocks in parallel
116pub async fn sync_cycle_snap(
117    peers: &mut PeerHandler,
118    blockchain: Arc<Blockchain>,
119    snap_enabled: &std::sync::atomic::AtomicBool,
120    sync_head: H256,
121    store: Store,
122    datadir: &Path,
123    diagnostics: &Arc<tokio::sync::RwLock<super::SyncDiagnostics>>,
124) -> Result<(), SyncError> {
125    // Request all block headers between the current head and the sync head
126    // We will begin from the current head so that we download the earliest state first
127    // This step is not parallelized
128    let mut block_sync_state = SnapBlockSyncState::new(store.clone());
129    // Check if we have some blocks downloaded from a previous sync attempt
130    // This applies only to snap sync—full sync always starts fetching headers
131    // from the canonical block, which updates as new block headers are fetched.
132    let mut current_head = block_sync_state.get_current_head().await?;
133    let mut current_head_number = store
134        .get_block_number(current_head)
135        .await?
136        .ok_or(SyncError::BlockNumber(current_head))?;
137    {
138        let mut diag = diagnostics.write().await;
139        diag.current_phase = "headers".to_string();
140        diag.sync_mode = "snap".to_string();
141    }
142    debug!(
143        "Syncing from current head {:?} to sync_head {:?}",
144        current_head, sync_head
145    );
146    let pending_block = match store.get_pending_block(sync_head).await {
147        Ok(res) => res,
148        Err(e) => return Err(e.into()),
149    };
150
151    let mut attempts = 0;
152
153    loop {
154        // Prune dead/unresponsive peers periodically to allow replacements to be promoted
155        let _ = peers.peer_table.prune_table();
156
157        debug!("Requesting Block Headers from {current_head}");
158
159        let Some(mut block_headers) = peers
160            .request_block_headers(current_head_number, sync_head)
161            .await?
162        else {
163            if attempts >= MAX_HEADER_FETCH_ATTEMPTS {
164                warn!(
165                    "Sync failed to find target block header after {attempts} attempts, aborting to wait for a newer sync head"
166                );
167                return Ok(());
168            }
169            attempts += 1;
170            debug!(
171                "Failed to fetch headers for sync head (attempt {attempts}/{MAX_HEADER_FETCH_ATTEMPTS}), retrying in 2s"
172            );
173            tokio::time::sleep(Duration::from_secs(2)).await;
174            continue;
175        };
176        // Reset failure counter on success so it tracks consecutive failures
177        attempts = 0;
178
179        debug!("Sync Log 1: In snap sync");
180        debug!(
181            "Sync Log 2: State block hashes len {}",
182            block_sync_state.block_hashes.len()
183        );
184
185        let (first_block_hash, first_block_number, first_block_parent_hash) =
186            match block_headers.first() {
187                Some(header) => (header.hash(), header.number, header.parent_hash),
188                None => continue,
189            };
190        let (last_block_hash, last_block_number) = match block_headers.last() {
191            Some(header) => (header.hash(), header.number),
192            None => continue,
193        };
194        // TODO(#2126): This is just a temporary solution to avoid a bug where the sync would get stuck
195        // on a loop when the target head is not found, i.e. on a reorg with a side-chain.
196        if first_block_hash == last_block_hash
197            && first_block_hash == current_head
198            && current_head != sync_head
199        {
200            // There is no path to the sync head this goes back until it find a common ancerstor
201            warn!("Sync failed to find target block header, going back to the previous parent");
202            current_head = first_block_parent_hash;
203            continue;
204        }
205
206        debug!(
207            "Received {} block headers| First Number: {} Last Number: {}",
208            block_headers.len(),
209            first_block_number,
210            last_block_number
211        );
212
213        // If we have a pending block from new_payload request
214        // attach it to the end if it matches the parent_hash of the latest received header
215        if let Some(ref block) = pending_block
216            && block.header.parent_hash == last_block_hash
217        {
218            block_headers.push(block.header.clone());
219        }
220
221        // Filter out everything after the sync_head
222        let mut sync_head_found = false;
223        if let Some(index) = block_headers
224            .iter()
225            .position(|header| header.hash() == sync_head)
226        {
227            sync_head_found = true;
228            block_headers.drain(index + 1..);
229        }
230
231        // Update current fetch head
232        current_head = last_block_hash;
233        current_head_number = last_block_number;
234
235        // If the sync head is not 0 we search to fullsync
236        let head_found = sync_head_found && store.get_latest_block_number().await? > 0;
237        // Or the head is very close to 0. A pre-check in `sync.rs::sync_cycle`
238        // also gates on `< MIN_FULL_BLOCKS`; keep both — this one stays as a
239        // safety net for callers that enter `sync_cycle_snap` directly.
240        let head_close_to_0 = last_block_number < MIN_FULL_BLOCKS;
241
242        if head_found || head_close_to_0 {
243            // Too few blocks for a snap sync, switching to full sync
244            info!("Sync head is found, switching to FullSync");
245            snap_enabled.store(false, Ordering::Relaxed);
246            return super::full::sync_cycle_full(
247                peers,
248                blockchain,
249                tokio_util::sync::CancellationToken::new(),
250                sync_head,
251                store.clone(),
252                diagnostics,
253            )
254            .await;
255        }
256
257        // Discard the first header as we already have it
258        if block_headers.len() > 1 {
259            let block_headers_iter = block_headers.into_iter().skip(1);
260
261            block_sync_state
262                .process_incoming_headers(block_headers_iter)
263                .await?;
264        }
265
266        // Update diagnostics with header progress
267        {
268            let mut diag = diagnostics.write().await;
269            diag.phase_progress.insert(
270                "headers_downloaded".to_string(),
271                block_sync_state.block_hashes.len() as u64,
272            );
273        }
274
275        if sync_head_found {
276            break;
277        };
278    }
279
280    snap_sync(peers, &store, &mut block_sync_state, datadir, diagnostics).await?;
281
282    store.clear_snap_state().await?;
283    snap_enabled.store(false, Ordering::Relaxed);
284
285    Ok(())
286}
287
288/// Main snap sync logic - downloads state via snap protocol
289pub async fn snap_sync(
290    peers: &mut PeerHandler,
291    store: &Store,
292    block_sync_state: &mut SnapBlockSyncState,
293    datadir: &Path,
294    diagnostics: &Arc<tokio::sync::RwLock<super::SyncDiagnostics>>,
295) -> Result<(), SyncError> {
296    // snap-sync: launch tasks to fetch blocks and state in parallel
297    // - Fetch each block's body and its receipt via eth p2p requests
298    // - Fetch the pivot block's state via snap p2p requests
299    // - Execute blocks after the pivot (like in full-sync)
300    let pivot_hash = block_sync_state
301        .block_hashes
302        .last()
303        .ok_or(SyncError::NoBlockHeaders)?;
304    let mut pivot_header = store
305        .get_block_header_by_hash(*pivot_hash)?
306        .ok_or(SyncError::CorruptDB)?;
307
308    while block_is_stale(&pivot_header) {
309        pivot_header = update_pivot(
310            pivot_header.number,
311            pivot_header.timestamp,
312            peers,
313            block_sync_state,
314            diagnostics,
315        )
316        .await?;
317    }
318    debug!(
319        "Selected block {} as pivot for snap sync",
320        pivot_header.number
321    );
322    {
323        let mut diag = diagnostics.write().await;
324        diag.pivot_block_number = Some(pivot_header.number);
325        diag.pivot_timestamp = Some(pivot_header.timestamp);
326        let pivot_age = current_unix_time().saturating_sub(pivot_header.timestamp);
327        diag.pivot_age_seconds = Some(pivot_age);
328        diag.staleness_threshold_seconds = (SNAP_LIMIT as u64) * SECONDS_PER_BLOCK;
329        diag.sync_mode = "snap".to_string();
330        METRICS
331            .pivot_timestamp
332            .store(pivot_header.timestamp, std::sync::atomic::Ordering::Relaxed);
333    }
334
335    let state_root = pivot_header.state_root;
336    let account_state_snapshots_dir = get_account_state_snapshots_dir(datadir);
337    let account_storages_snapshots_dir = get_account_storages_snapshots_dir(datadir);
338
339    let code_hashes_snapshot_dir = get_code_hashes_snapshots_dir(datadir);
340    async_fs::ensure_dir_exists(&code_hashes_snapshot_dir).await?;
341
342    // Create collector to store code hashes in files
343    let mut code_hash_collector: CodeHashCollector =
344        CodeHashCollector::new(code_hashes_snapshot_dir.clone());
345
346    let mut storage_accounts = AccountStorageRoots::default();
347    if !std::env::var("SKIP_START_SNAP_SYNC").is_ok_and(|var| !var.is_empty()) {
348        // We start by downloading all of the leafs of the trie of accounts
349        // The function request_account_range writes the leafs into files in
350        // account_state_snapshots_dir
351
352        diagnostics.write().await.current_phase = "account_ranges".to_string();
353        request_account_range(
354            peers,
355            H256::zero(),
356            H256::repeat_byte(0xff),
357            account_state_snapshots_dir.as_ref(),
358            &mut pivot_header,
359            block_sync_state,
360            diagnostics,
361        )
362        .await?;
363        debug!("Finished downloading account ranges from peers");
364
365        {
366            let mut diag = diagnostics.write().await;
367            diag.current_phase = "account_insertion".to_string();
368            diag.phase_progress.insert(
369                "account_ranges_downloaded".to_string(),
370                METRICS
371                    .downloaded_account_tries
372                    .load(std::sync::atomic::Ordering::Relaxed),
373            );
374        }
375        *METRICS.account_tries_insert_start_time.lock().await = Some(SystemTime::now());
376        METRICS
377            .current_step
378            .set(CurrentStepValue::InsertingAccountRanges);
379        // We read the account leafs from the files in account_state_snapshots_dir, write it into
380        // the trie to compute the nodes and stores the accounts with storages for later use
381
382        // Variable `accounts_with_storage` unused if not in rocksdb
383        #[allow(unused_variables)]
384        let (computed_state_root, accounts_with_storage) = insert_accounts(
385            store.clone(),
386            &mut storage_accounts,
387            &account_state_snapshots_dir,
388            datadir,
389            &mut code_hash_collector,
390        )
391        .await?;
392        debug!(
393            "Finished inserting account ranges, total storage accounts: {}",
394            storage_accounts.accounts_with_storage_root.len()
395        );
396        *METRICS.account_tries_insert_end_time.lock().await = Some(SystemTime::now());
397
398        debug!("Original state root: {state_root:?}");
399        debug!("Computed state root after request_account_ranges: {computed_state_root:?}");
400
401        diagnostics.write().await.current_phase = "storage_ranges".to_string();
402        *METRICS.storage_tries_download_start_time.lock().await = Some(SystemTime::now());
403        // We start downloading the storage leafs. To do so, we need to be sure that the storage root
404        // is correct. To do so, we always heal the state trie before requesting storage rates
405        let mut chunk_index = 0_u64;
406        let mut state_leafs_healed = 0_u64;
407        let mut storage_range_request_attempts = 0;
408        loop {
409            while block_is_stale(&pivot_header) {
410                pivot_header = update_pivot(
411                    pivot_header.number,
412                    pivot_header.timestamp,
413                    peers,
414                    block_sync_state,
415                    diagnostics,
416                )
417                .await?;
418            }
419            // heal_state_trie_wrap returns false if we ran out of time before fully healing the trie
420            // We just need to update the pivot and start again
421            if !heal_state_trie_wrap(
422                pivot_header.state_root,
423                store.clone(),
424                peers,
425                calculate_staleness_timestamp(pivot_header.timestamp),
426                &mut state_leafs_healed,
427                &mut storage_accounts,
428                &mut code_hash_collector,
429            )
430            .await?
431            {
432                continue;
433            };
434
435            debug!(
436                "Started request_storage_ranges with {} accounts with storage root unchanged",
437                storage_accounts.accounts_with_storage_root.len()
438            );
439            storage_range_request_attempts += 1;
440            if storage_range_request_attempts < 5 {
441                chunk_index = request_storage_ranges(
442                    peers,
443                    &mut storage_accounts,
444                    account_storages_snapshots_dir.as_ref(),
445                    chunk_index,
446                    &mut pivot_header,
447                    store.clone(),
448                )
449                .await?;
450            } else {
451                for (acc_hash, (maybe_root, old_intervals)) in
452                    storage_accounts.accounts_with_storage_root.iter()
453                {
454                    // When we fall into this case what happened is there are certain accounts for which
455                    // the storage root went back to a previous value we already had, and thus could not download
456                    // their storage leaves because we were using an old value for their storage root.
457                    // The fallback is to ensure we mark it for storage healing.
458                    storage_accounts.healed_accounts.insert(*acc_hash);
459                    debug!(
460                        "We couldn't download these accounts on request_storage_ranges. Falling back to storage healing for it.
461                        Account hash: {:x?}, {:x?}. Number of intervals {}",
462                        acc_hash,
463                        maybe_root,
464                        old_intervals.len()
465                    );
466                }
467
468                warn!(
469                    "Storage could not be downloaded after multiple attempts. Marking for healing. This could impact snap sync time (healing may take a while)."
470                );
471
472                storage_accounts.accounts_with_storage_root.clear();
473            }
474
475            debug!(
476                "Ended request_storage_ranges with {} accounts with storage root unchanged and not downloaded yet and with {} big/healed accounts",
477                storage_accounts.accounts_with_storage_root.len(),
478                // These accounts are marked as heals if they're a big account. This is
479                // because we don't know if the storage root is still valid
480                storage_accounts.healed_accounts.len(),
481            );
482            if !block_is_stale(&pivot_header) {
483                break;
484            }
485            debug!("Pivot became stale during storage download, restarting loop");
486        }
487        debug!("Finished request_storage_ranges");
488        *METRICS.storage_tries_download_end_time.lock().await = Some(SystemTime::now());
489
490        diagnostics.write().await.current_phase = "storage_insertion".to_string();
491        *METRICS.storage_tries_insert_start_time.lock().await = Some(SystemTime::now());
492        METRICS
493            .current_step
494            .set(CurrentStepValue::InsertingStorageRanges);
495        let account_storages_snapshots_dir = get_account_storages_snapshots_dir(datadir);
496
497        insert_storages(
498            store.clone(),
499            accounts_with_storage,
500            &account_storages_snapshots_dir,
501            datadir,
502        )
503        .await?;
504
505        *METRICS.storage_tries_insert_end_time.lock().await = Some(SystemTime::now());
506
507        debug!("Finished storing storage tries");
508    }
509
510    diagnostics.write().await.current_phase = "healing".to_string();
511    *METRICS.heal_start_time.lock().await = Some(SystemTime::now());
512    debug!("Starting healing process");
513    let mut global_state_leafs_healed: u64 = 0;
514    let mut global_storage_leafs_healed: u64 = 0;
515    let mut healing_done = false;
516    while !healing_done {
517        // This if is an edge case for the skip snap sync scenario
518        if block_is_stale(&pivot_header) {
519            pivot_header = update_pivot(
520                pivot_header.number,
521                pivot_header.timestamp,
522                peers,
523                block_sync_state,
524                diagnostics,
525            )
526            .await?;
527        }
528        healing_done = heal_state_trie_wrap(
529            pivot_header.state_root,
530            store.clone(),
531            peers,
532            calculate_staleness_timestamp(pivot_header.timestamp),
533            &mut global_state_leafs_healed,
534            &mut storage_accounts,
535            &mut code_hash_collector,
536        )
537        .await?;
538        if !healing_done {
539            continue;
540        }
541        healing_done = heal_storage_trie(
542            pivot_header.state_root,
543            &storage_accounts,
544            peers,
545            store.clone(),
546            HashMap::new(),
547            calculate_staleness_timestamp(pivot_header.timestamp),
548            &mut global_storage_leafs_healed,
549        )
550        .await?;
551    }
552    *METRICS.heal_end_time.lock().await = Some(SystemTime::now());
553
554    store.generate_flatkeyvalue()?;
555
556    debug_assert!(validate_state_root(store.clone(), pivot_header.state_root).await);
557    debug_assert!(validate_storage_root(store.clone(), pivot_header.state_root).await);
558
559    debug!("Finished healing");
560
561    // Finish code hash collection
562    code_hash_collector.finish().await?;
563
564    *METRICS.bytecode_download_start_time.lock().await = Some(SystemTime::now());
565
566    let code_hashes_dir = get_code_hashes_snapshots_dir(datadir);
567    let mut seen_code_hashes = HashSet::new();
568    let mut code_hashes_to_download = Vec::new();
569
570    diagnostics.write().await.current_phase = "bytecodes".to_string();
571    debug!("Starting download code hashes from peers");
572    let code_hash_files = async_fs::read_dir_paths(&code_hashes_dir).await?;
573    for file_path in code_hash_files {
574        let snapshot_contents = async_fs::read_file(&file_path).await?;
575        let code_hashes: Vec<H256> = RLPDecode::decode(&snapshot_contents)
576            .map_err(|_| SyncError::CodeHashesSnapshotDecodeError(file_path))?;
577
578        for hash in code_hashes {
579            // If we haven't seen the code hash yet, add it to the list of hashes to download
580            if seen_code_hashes.insert(hash) {
581                code_hashes_to_download.push(hash);
582
583                if code_hashes_to_download.len() >= BYTECODE_CHUNK_SIZE {
584                    debug!(
585                        "Starting bytecode download of {} hashes",
586                        code_hashes_to_download.len()
587                    );
588                    let bytecodes = request_bytecodes(peers, &code_hashes_to_download)
589                        .await?
590                        .ok_or(SyncError::BytecodesNotFound)?;
591
592                    store
593                        .write_account_code_batch(
594                            code_hashes_to_download
595                                .drain(..)
596                                .zip(bytecodes)
597                                // SAFETY: hash already checked by the download worker
598                                .map(|(hash, code)| {
599                                    (hash, Code::from_bytecode_unchecked(code, hash))
600                                })
601                                .collect(),
602                        )
603                        .await?;
604                }
605            }
606        }
607    }
608
609    // Download remaining bytecodes if any
610    if !code_hashes_to_download.is_empty() {
611        let bytecodes = request_bytecodes(peers, &code_hashes_to_download)
612            .await?
613            .ok_or(SyncError::BytecodesNotFound)?;
614        store
615            .write_account_code_batch(
616                code_hashes_to_download
617                    .drain(..)
618                    .zip(bytecodes)
619                    // SAFETY: hash already checked by the download worker
620                    .map(|(hash, code)| (hash, Code::from_bytecode_unchecked(code, hash)))
621                    .collect(),
622            )
623            .await?;
624    }
625
626    async_fs::remove_dir_all(&code_hashes_dir).await?;
627
628    *METRICS.bytecode_download_end_time.lock().await = Some(SystemTime::now());
629
630    debug_assert!(validate_bytecodes(store.clone(), pivot_header.state_root));
631
632    store_block_bodies(vec![pivot_header.clone()], peers.clone(), store.clone()).await?;
633
634    let block = store
635        .get_block_by_hash(pivot_header.hash())
636        .await?
637        .ok_or(SyncError::CorruptDB)?;
638
639    store.add_block(block).await?;
640
641    let numbers_and_hashes = block_sync_state
642        .block_hashes
643        .iter()
644        .rev()
645        .enumerate()
646        .map(|(i, hash)| (pivot_header.number - i as u64, *hash))
647        .collect::<Vec<_>>();
648
649    store
650        .forkchoice_update(
651            numbers_and_hashes,
652            pivot_header.number,
653            pivot_header.hash(),
654            None,
655            None,
656        )
657        .await?;
658    Ok(())
659}
660
661/// Fetches all block bodies for the given block headers via p2p and stores them
662pub async fn store_block_bodies(
663    mut block_headers: Vec<BlockHeader>,
664    mut peers: PeerHandler,
665    store: Store,
666) -> Result<(), SyncError> {
667    loop {
668        debug!("Requesting Block Bodies ");
669        if let Some(block_bodies) = peers.request_block_bodies(&block_headers).await? {
670            debug!(" Received {} Block Bodies", block_bodies.len());
671            // Track which bodies we have already fetched
672            let current_block_headers = block_headers.drain(..block_bodies.len());
673            // Add bodies to storage
674            for (hash, body) in current_block_headers
675                .map(|h| h.hash())
676                .zip(block_bodies.into_iter())
677            {
678                store.add_block_body(hash, body).await?;
679            }
680
681            // Check if we need to ask for another batch
682            if block_headers.is_empty() {
683                break;
684            }
685        }
686    }
687    Ok(())
688}
689
690pub async fn update_pivot(
691    block_number: u64,
692    block_timestamp: u64,
693    peers: &mut PeerHandler,
694    block_sync_state: &mut SnapBlockSyncState,
695    diagnostics: &Arc<tokio::sync::RwLock<super::SyncDiagnostics>>,
696) -> Result<BlockHeader, SyncError> {
697    /// Maximum number of full peer rotations before giving up. With rotation,
698    /// each pass tries every eligible peer once; the budget scales naturally
699    /// with network size. Between rotations we back off exponentially.
700    const MAX_ROTATIONS: u64 = 5;
701    const INITIAL_RETRY_DELAY: Duration = Duration::from_secs(1);
702    const MAX_RETRY_DELAY: Duration = Duration::from_secs(30);
703
704    // We multiply the estimation by 0.9 in order to account for missing slots (~9% in tesnets)
705    let new_pivot_block_number = block_number
706        + ((current_unix_time().saturating_sub(block_timestamp) / SECONDS_PER_BLOCK) as f64
707            * MISSING_SLOTS_PERCENTAGE) as u64;
708    debug!(
709        "Current pivot is stale (number: {}, timestamp: {}). New pivot number: {}",
710        block_number, block_timestamp, new_pivot_block_number
711    );
712
713    let mut rotation_count: u64 = 0;
714    // Track peers that already failed this rotation so we try every eligible
715    // peer once before retrying any. When the rotation is exhausted, clear
716    // and start a new one.
717    let mut excluded_peers: Vec<H256> = Vec::new();
718
719    loop {
720        if rotation_count >= MAX_ROTATIONS {
721            #[cfg(feature = "metrics")]
722            ethrex_metrics::sync::METRICS_SYNC.inc_pivot_update("max_failures");
723            diagnostics
724                .write()
725                .await
726                .push_pivot_change(super::PivotChangeEvent {
727                    timestamp: current_unix_time(),
728                    old_pivot_number: block_number,
729                    new_pivot_number: new_pivot_block_number,
730                    outcome: "max_failures".to_string(),
731                    failure_reason: Some(format!("Exhausted {MAX_ROTATIONS} full rotations")),
732                });
733            return Err(SyncError::PeerHandler(
734                crate::peer_handler::PeerHandlerError::BlockHeaders,
735            ));
736        }
737
738        // Exponential backoff: doubles each rotation, capped at MAX_RETRY_DELAY
739        if rotation_count > 0 {
740            let delay = INITIAL_RETRY_DELAY.saturating_mul(1 << rotation_count.min(4));
741            let delay = delay.min(MAX_RETRY_DELAY);
742            debug!(
743                "update_pivot: backing off for {}s (rotation={rotation_count})",
744                delay.as_secs()
745            );
746            tokio::time::sleep(delay).await;
747        }
748
749        // One permit per attempt: consumed by `get_block_header` below.
750        let Some((peer_id, mut connection, permit)) = peers
751            .peer_table
752            .get_best_peer_excluding(SUPPORTED_ETH_CAPABILITIES.to_vec(), excluded_peers.clone())
753            .await?
754        else {
755            // Distinguish "rotation exhausted" from "no peers currently eligible
756            // (all at capacity)". Read-only probe — does not bump `requests`.
757            let any_eligible = peers
758                .peer_table
759                .has_eligible_peer(SUPPORTED_ETH_CAPABILITIES.to_vec())
760                .await?;
761
762            if !any_eligible {
763                debug!("update_pivot: no eligible peers available, waiting");
764                #[cfg(feature = "metrics")]
765                ethrex_metrics::sync::METRICS_SYNC.inc_pivot_update("no_peers");
766                tokio::time::sleep(Duration::from_secs(1)).await;
767            } else if excluded_peers.is_empty() {
768                // Peers exist but none match — shouldn't happen in practice
769                debug!("update_pivot: peers exist but none selectable, retrying");
770                tokio::time::sleep(Duration::from_secs(1)).await;
771            } else {
772                // All non-excluded peers were already tried — rotation done
773                debug!(
774                    "update_pivot: rotation {rotation_count} complete ({} peers tried), starting next",
775                    excluded_peers.len()
776                );
777                excluded_peers.clear();
778                rotation_count = rotation_count.saturating_add(1);
779            }
780            continue;
781        };
782
783        let peer_score = peers.peer_table.get_score(peer_id).await?;
784        let diag = peers.read_peer_diagnostics().await;
785        let eligible_count = diag.iter().filter(|p| p.eligible).count();
786        let total_count = diag.len();
787        debug!(
788            eligible_peers = eligible_count,
789            total_peers = total_count,
790            selected_peer = %peer_id,
791            peer_score = peer_score,
792            excluded_count = excluded_peers.len(),
793            rotation = rotation_count,
794            "update_pivot: attempting with peer"
795        );
796        debug!(
797            "Trying to update pivot to {new_pivot_block_number} with peer {peer_id} (score: {peer_score})"
798        );
799
800        // One attempt per peer per rotation. A peer that fails is excluded for
801        // this rotation and will be retried (with backoff) in the next one.
802        let outcome = peers
803            .get_block_header(&mut connection, permit, new_pivot_block_number)
804            .await;
805
806        match outcome {
807            Ok(Some(pivot)) => {
808                peers.peer_table.record_success(peer_id)?;
809                #[cfg(feature = "metrics")]
810                ethrex_metrics::sync::METRICS_SYNC.inc_pivot_update("success");
811                info!("Snap sync pivot updated to block {}", pivot.number);
812
813                {
814                    let mut diag = diagnostics.write().await;
815                    diag.push_pivot_change(super::PivotChangeEvent {
816                        timestamp: current_unix_time(),
817                        old_pivot_number: block_number,
818                        new_pivot_number: pivot.number,
819                        outcome: "success".to_string(),
820                        failure_reason: None,
821                    });
822                    diag.pivot_block_number = Some(pivot.number);
823                    diag.pivot_timestamp = Some(pivot.timestamp);
824                    let pivot_age = current_unix_time().saturating_sub(pivot.timestamp);
825                    diag.pivot_age_seconds = Some(pivot_age);
826                    METRICS
827                        .pivot_timestamp
828                        .store(pivot.timestamp, std::sync::atomic::Ordering::Relaxed);
829                }
830                let block_headers = peers
831                    .request_block_headers(block_number + 1, pivot.hash())
832                    .await?
833                    .ok_or(SyncError::NoBlockHeaders)?;
834                block_sync_state
835                    .process_incoming_headers(block_headers.into_iter())
836                    .await?;
837                *METRICS.sync_head_hash.lock().await = pivot.hash();
838                return Ok(pivot);
839            }
840            Ok(None) => {
841                peers.peer_table.record_failure(peer_id)?;
842                let peer_score = peers.peer_table.get_score(peer_id).await?;
843                debug!(
844                    "update_pivot: peer {peer_id} returned None (score: {peer_score}), excluding for this rotation"
845                );
846                #[cfg(feature = "metrics")]
847                ethrex_metrics::sync::METRICS_SYNC.inc_pivot_update("peer_none");
848                excluded_peers.push(peer_id);
849            }
850            Err(e) if e.is_recoverable() => {
851                peers.peer_table.record_failure(peer_id)?;
852                debug!("update_pivot: peer {peer_id} failed with {e}, excluding for this rotation");
853                #[cfg(feature = "metrics")]
854                ethrex_metrics::sync::METRICS_SYNC.inc_pivot_update("peer_error");
855                excluded_peers.push(peer_id);
856            }
857            Err(e) => {
858                // Non-recoverable error (e.g., dead peer table actor,
859                // storage full) — surface it.
860                return Err(SyncError::PeerHandler(e));
861            }
862        }
863    }
864}
865
866pub fn block_is_stale(block_header: &BlockHeader) -> bool {
867    let threshold = calculate_staleness_timestamp(block_header.timestamp);
868    let now = current_unix_time();
869    let is_stale = threshold < now;
870    if is_stale {
871        let pivot_age = now.saturating_sub(block_header.timestamp);
872        let staleness_limit = (SNAP_LIMIT as u64) * SECONDS_PER_BLOCK;
873        debug!(
874            pivot_number = block_header.number,
875            pivot_timestamp = block_header.timestamp,
876            pivot_age_seconds = pivot_age,
877            staleness_threshold_seconds = staleness_limit,
878            "Pivot block detected as stale"
879        );
880    }
881    is_stale
882}
883
884pub fn calculate_staleness_timestamp(timestamp: u64) -> u64 {
885    timestamp + (SNAP_LIMIT as u64 * 12)
886}
887
888pub async fn validate_state_root(store: Store, state_root: H256) -> bool {
889    info!("Starting validate_state_root");
890    let validated = tokio::task::spawn_blocking(move || {
891        store
892            .open_locked_state_trie(state_root)
893            .expect("couldn't open trie")
894            .validate()
895    })
896    .await
897    .expect("We should be able to create threads");
898
899    if validated.is_ok() {
900        info!("Successfully validated tree, {state_root} found");
901    } else {
902        error!("We have failed the validation of the state tree");
903        std::process::exit(1);
904    }
905    validated.is_ok()
906}
907
908pub async fn validate_storage_root(store: Store, state_root: H256) -> bool {
909    info!("Starting validate_storage_root");
910    let is_valid = tokio::task::spawn_blocking(move || {
911        store
912            .iter_accounts(state_root)
913            .expect("couldn't iterate accounts")
914            .par_bridge()
915            .try_for_each(|(hashed_address, account_state)| {
916                let store_clone = store.clone();
917                store_clone
918                    .open_locked_storage_trie(
919                        hashed_address,
920                        state_root,
921                        account_state.storage_root,
922                    )
923                    .expect("couldn't open storage trie")
924                    .validate()
925            })
926    })
927    .await
928    .expect("We should be able to create threads");
929    info!("Finished validate_storage_root");
930    if is_valid.is_err() {
931        std::process::exit(1);
932    }
933    is_valid.is_ok()
934}
935
936pub fn validate_bytecodes(store: Store, state_root: H256) -> bool {
937    info!("Starting validate_bytecodes");
938    let mut is_valid = true;
939    for (account_hash, account_state) in store
940        .iter_accounts(state_root)
941        .expect("we couldn't iterate over accounts")
942    {
943        if account_state.code_hash != *EMPTY_KECCAK_HASH
944            && !store
945                .get_account_code(account_state.code_hash)
946                .is_ok_and(|code| code.is_some())
947        {
948            error!(
949                "Missing code hash {:x} for account {:x}",
950                account_state.code_hash, account_hash
951            );
952            is_valid = false
953        }
954    }
955    if !is_valid {
956        std::process::exit(1);
957    }
958    is_valid
959}
960
961// ============================================================================
962// Account and Storage Insertion (non-rocksdb)
963// ============================================================================
964
965#[cfg(not(feature = "rocksdb"))]
966type StorageRoots = (H256, Vec<(ethrex_trie::Nibbles, Vec<u8>)>);
967
968#[cfg(not(feature = "rocksdb"))]
969fn compute_storage_roots(
970    store: Store,
971    account_hash: H256,
972    key_value_pairs: &[(H256, U256)],
973) -> Result<StorageRoots, SyncError> {
974    use ethrex_trie::{Nibbles, Node};
975
976    let storage_trie = store.open_direct_storage_trie(account_hash, *EMPTY_TRIE_HASH)?;
977    let trie_hash = match storage_trie.db().get(Nibbles::default())? {
978        Some(noderlp) => Node::decode(&noderlp)?
979            .compute_hash(&ethrex_crypto::NativeCrypto)
980            .finalize(&ethrex_crypto::NativeCrypto),
981        None => *EMPTY_TRIE_HASH,
982    };
983    let mut storage_trie = store.open_direct_storage_trie(account_hash, trie_hash)?;
984
985    for (hashed_key, value) in key_value_pairs {
986        if let Err(err) = storage_trie.insert(hashed_key.0.to_vec(), value.encode_to_vec()) {
987            warn!(
988                "Failed to insert hashed key {hashed_key:?} in account hash: {account_hash:?}, err={err:?}"
989            );
990        };
991        METRICS.storage_leaves_inserted.inc();
992    }
993
994    let (_, changes) = storage_trie.collect_changes_since_last_hash(&ethrex_crypto::NativeCrypto);
995
996    Ok((account_hash, changes))
997}
998
999#[cfg(not(feature = "rocksdb"))]
1000async fn insert_accounts(
1001    store: Store,
1002    storage_accounts: &mut AccountStorageRoots,
1003    account_state_snapshots_dir: &Path,
1004    _: &Path,
1005    code_hash_collector: &mut CodeHashCollector,
1006) -> Result<(H256, BTreeSet<H256>), SyncError> {
1007    let mut computed_state_root = *EMPTY_TRIE_HASH;
1008    let snapshot_files = async_fs::read_dir_paths(account_state_snapshots_dir).await?;
1009    for snapshot_path in snapshot_files {
1010        debug!("Reading account file from {snapshot_path:?}");
1011        let snapshot_contents = async_fs::read_file(&snapshot_path).await?;
1012        let account_states_snapshot: Vec<(H256, AccountState)> =
1013            RLPDecode::decode(&snapshot_contents)
1014                .map_err(|_| SyncError::SnapshotDecodeError(snapshot_path.clone()))?;
1015
1016        storage_accounts.accounts_with_storage_root.extend(
1017            account_states_snapshot.iter().filter_map(|(hash, state)| {
1018                (state.storage_root != *EMPTY_TRIE_HASH)
1019                    .then_some((*hash, (Some(state.storage_root), Vec::new())))
1020            }),
1021        );
1022
1023        // Collect valid code hashes from current account snapshot
1024        let code_hashes_from_snapshot: Vec<H256> = account_states_snapshot
1025            .iter()
1026            .filter_map(|(_, state)| {
1027                (state.code_hash != *EMPTY_KECCAK_HASH).then_some(state.code_hash)
1028            })
1029            .collect();
1030
1031        code_hash_collector.extend(code_hashes_from_snapshot);
1032        code_hash_collector.flush_if_needed().await?;
1033
1034        info!("Inserting accounts into the state trie");
1035
1036        let store_clone = store.clone();
1037        let current_state_root: Result<H256, SyncError> =
1038            tokio::task::spawn_blocking(move || -> Result<H256, SyncError> {
1039                let mut trie = store_clone.open_direct_state_trie(computed_state_root)?;
1040
1041                for (account_hash, account) in account_states_snapshot {
1042                    trie.insert(account_hash.0.to_vec(), account.encode_to_vec())?;
1043                }
1044                info!("Comitting to disk");
1045                let current_state_root = trie.hash(&ethrex_crypto::NativeCrypto)?;
1046                Ok(current_state_root)
1047            })
1048            .await?;
1049
1050        computed_state_root = current_state_root?;
1051    }
1052    async_fs::remove_dir_all(account_state_snapshots_dir).await?;
1053    info!("computed_state_root {computed_state_root}");
1054    Ok((computed_state_root, BTreeSet::new()))
1055}
1056
1057#[cfg(not(feature = "rocksdb"))]
1058async fn insert_storages(
1059    store: Store,
1060    _: BTreeSet<H256>,
1061    account_storages_snapshots_dir: &Path,
1062    _: &Path,
1063) -> Result<(), SyncError> {
1064    use crate::utils::AccountsWithStorage;
1065    use rayon::iter::IntoParallelIterator;
1066
1067    let snapshot_files = async_fs::read_dir_paths(account_storages_snapshots_dir).await?;
1068    for snapshot_path in snapshot_files {
1069        info!("Reading account storage file from {snapshot_path:?}");
1070
1071        let snapshot_contents = async_fs::read_file(&snapshot_path).await?;
1072
1073        #[expect(clippy::type_complexity)]
1074        let account_storages_snapshot: Vec<AccountsWithStorage> =
1075            RLPDecode::decode(&snapshot_contents)
1076                .map(|all_accounts: Vec<(Vec<H256>, Vec<(H256, U256)>)>| {
1077                    all_accounts
1078                        .into_iter()
1079                        .map(|(accounts, storages)| AccountsWithStorage { accounts, storages })
1080                        .collect()
1081                })
1082                .map_err(|_| SyncError::SnapshotDecodeError(snapshot_path.clone()))?;
1083
1084        let store_clone = store.clone();
1085        info!("Starting compute of account_storages_snapshot");
1086        let storage_trie_node_changes = tokio::task::spawn_blocking(move || {
1087            let store: Store = store_clone;
1088
1089            account_storages_snapshot
1090                .into_par_iter()
1091                .flat_map(|account_storages| {
1092                    let storages: Arc<[_]> = account_storages.storages.into();
1093                    account_storages
1094                        .accounts
1095                        .into_par_iter()
1096                        // FIXME: we probably want to make storages an Arc
1097                        .map(move |account| (account, storages.clone()))
1098                })
1099                .map(|(account, storages)| compute_storage_roots(store.clone(), account, &storages))
1100                .collect::<Result<Vec<_>, SyncError>>()
1101        })
1102        .await??;
1103        info!("Writing to db");
1104
1105        store
1106            .write_storage_trie_nodes_batch(storage_trie_node_changes)
1107            .await?;
1108    }
1109
1110    async_fs::remove_dir_all(account_storages_snapshots_dir).await?;
1111
1112    Ok(())
1113}
1114
1115// ============================================================================
1116// Account and Storage Insertion (rocksdb)
1117// ============================================================================
1118
1119#[cfg(feature = "rocksdb")]
1120async fn insert_accounts(
1121    store: Store,
1122    storage_accounts: &mut AccountStorageRoots,
1123    account_state_snapshots_dir: &Path,
1124    datadir: &Path,
1125    code_hash_collector: &mut CodeHashCollector,
1126) -> Result<(H256, BTreeSet<H256>), SyncError> {
1127    use crate::utils::get_rocksdb_temp_accounts_dir;
1128    use ethrex_trie::trie_sorted::trie_from_sorted_accounts_wrap;
1129
1130    let trie = store.open_direct_state_trie(*EMPTY_TRIE_HASH)?;
1131    let mut db_options = rocksdb::Options::default();
1132    db_options.create_if_missing(true);
1133    let db = rocksdb::DB::open(&db_options, get_rocksdb_temp_accounts_dir(datadir))
1134        .map_err(|e| SyncError::AccountTempDBDirNotFound(e.to_string()))?;
1135    let file_paths: Vec<PathBuf> = async_fs::read_dir_paths(account_state_snapshots_dir).await?;
1136    // Move SST files into the temp DB instead of copying them. The snapshot dir
1137    // and the temp DB live under the same datadir, so rename succeeds and we
1138    // avoid keeping two on-disk copies of the leaf data during ingest.
1139    let mut ingest_opts = rocksdb::IngestExternalFileOptions::default();
1140    ingest_opts.set_move_files(true);
1141    db.ingest_external_file_opts(&ingest_opts, file_paths)
1142        .map_err(|err| SyncError::RocksDBError(err.into_string()))?;
1143    let iter = db.full_iterator(rocksdb::IteratorMode::Start);
1144    for account in iter {
1145        let account = account.map_err(|err| SyncError::RocksDBError(err.into_string()))?;
1146        let account_state = AccountState::decode(&account.1).map_err(SyncError::Rlp)?;
1147        if account_state.code_hash != *EMPTY_KECCAK_HASH {
1148            code_hash_collector.add(account_state.code_hash);
1149            code_hash_collector.flush_if_needed().await?;
1150        }
1151    }
1152
1153    let iter = db.full_iterator(rocksdb::IteratorMode::Start);
1154    let compute_state_root = trie_from_sorted_accounts_wrap(
1155        trie.db(),
1156        &mut iter
1157            .map(|k| k.expect("We shouldn't have a rocksdb error here")) // TODO: remove unwrap
1158            .inspect(|(k, v)| {
1159                METRICS
1160                    .account_tries_inserted
1161                    .fetch_add(1, Ordering::Relaxed);
1162                let account_state = AccountState::decode(v).expect("We should have accounts here");
1163                if account_state.storage_root != *EMPTY_TRIE_HASH {
1164                    storage_accounts.accounts_with_storage_root.insert(
1165                        H256::from_slice(k),
1166                        (Some(account_state.storage_root), Vec::new()),
1167                    );
1168                }
1169            })
1170            .map(|(k, v)| (H256::from_slice(&k), v.to_vec())),
1171    )
1172    .map_err(SyncError::TrieGenerationError)?;
1173
1174    drop(db); // close db before removing directory
1175
1176    async_fs::remove_dir_all(account_state_snapshots_dir).await?;
1177    async_fs::remove_dir_all(&get_rocksdb_temp_accounts_dir(datadir)).await?;
1178
1179    let accounts_with_storage =
1180        BTreeSet::from_iter(storage_accounts.accounts_with_storage_root.keys().copied());
1181    Ok((compute_state_root, accounts_with_storage))
1182}
1183
1184#[cfg(feature = "rocksdb")]
1185async fn insert_storages(
1186    store: Store,
1187    accounts_with_storage: BTreeSet<H256>,
1188    account_storages_snapshots_dir: &Path,
1189    datadir: &Path,
1190) -> Result<(), SyncError> {
1191    use crate::utils::get_rocksdb_temp_storage_dir;
1192    use crossbeam::channel::{bounded, unbounded};
1193    use ethrex_trie::{
1194        Nibbles, Node, ThreadPool,
1195        trie_sorted::{BUFFER_COUNT, SIZE_TO_WRITE_DB, trie_from_sorted_accounts},
1196    };
1197    use std::thread::scope;
1198
1199    struct RocksDBIterator<'a> {
1200        iter: rocksdb::DBRawIterator<'a>,
1201        limit: H256,
1202    }
1203
1204    impl<'a> Iterator for RocksDBIterator<'a> {
1205        type Item = (H256, Vec<u8>);
1206
1207        fn next(&mut self) -> Option<Self::Item> {
1208            if !self.iter.valid() {
1209                return None;
1210            }
1211            let return_value = {
1212                let key = self.iter.key();
1213                let value = self.iter.value();
1214                match (key, value) {
1215                    (Some(key), Some(value)) => {
1216                        let hash = H256::from_slice(&key[0..32]);
1217                        let key = H256::from_slice(&key[32..]);
1218                        let value = value.to_vec();
1219                        if hash != self.limit {
1220                            None
1221                        } else {
1222                            Some((key, value))
1223                        }
1224                    }
1225                    _ => None,
1226                }
1227            };
1228            self.iter.next();
1229            return_value
1230        }
1231    }
1232
1233    let mut db_options = rocksdb::Options::default();
1234    db_options.create_if_missing(true);
1235    let db = rocksdb::DB::open(&db_options, get_rocksdb_temp_storage_dir(datadir))
1236        .map_err(|err: rocksdb::Error| SyncError::RocksDBError(err.into_string()))?;
1237    let file_paths: Vec<PathBuf> = async_fs::read_dir_paths(account_storages_snapshots_dir).await?;
1238    // Move SST files into the temp DB instead of copying them. The snapshot dir
1239    // and the temp DB live under the same datadir, so rename succeeds and we
1240    // avoid keeping two on-disk copies of the leaf data during ingest.
1241    let mut ingest_opts = rocksdb::IngestExternalFileOptions::default();
1242    ingest_opts.set_move_files(true);
1243    db.ingest_external_file_opts(&ingest_opts, file_paths)
1244        .map_err(|err| SyncError::RocksDBError(err.into_string()))?;
1245    let snapshot = db.snapshot();
1246
1247    let account_with_storage_and_tries = accounts_with_storage
1248        .into_iter()
1249        .map(|account_hash| {
1250            (
1251                account_hash,
1252                store
1253                    .open_direct_storage_trie(account_hash, *EMPTY_TRIE_HASH)
1254                    .expect("Should be able to open trie"),
1255            )
1256        })
1257        .collect::<Vec<(H256, Trie)>>();
1258
1259    let (sender, receiver) = unbounded::<()>();
1260    let mut counter = 0;
1261    let thread_count = std::thread::available_parallelism()
1262        .map(|num| num.into())
1263        .unwrap_or(8);
1264
1265    let (buffer_sender, buffer_receiver) = bounded::<Vec<(Nibbles, Node)>>(BUFFER_COUNT as usize);
1266    for _ in 0..BUFFER_COUNT {
1267        let _ = buffer_sender.send(Vec::with_capacity(SIZE_TO_WRITE_DB as usize));
1268    }
1269
1270    scope(|scope| {
1271        let pool: Arc<ThreadPool<'_>> = Arc::new(ThreadPool::new(thread_count, scope));
1272        for (account_hash, trie) in account_with_storage_and_tries.iter() {
1273            let sender = sender.clone();
1274            let buffer_sender = buffer_sender.clone();
1275            let buffer_receiver = buffer_receiver.clone();
1276            if counter >= thread_count - 1 {
1277                let _ = receiver.recv();
1278                counter -= 1;
1279            }
1280            counter += 1;
1281            let pool_clone = pool.clone();
1282            let mut iter = snapshot.raw_iterator();
1283            let task = Box::new(move || {
1284                let mut buffer: [u8; 64] = [0_u8; 64];
1285                buffer[..32].copy_from_slice(&account_hash.0);
1286                iter.seek(buffer);
1287                let iter = RocksDBIterator {
1288                    iter,
1289                    limit: *account_hash,
1290                };
1291
1292                let _ = trie_from_sorted_accounts(
1293                    trie.db(),
1294                    &mut iter.inspect(|_| METRICS.storage_leaves_inserted.inc()),
1295                    pool_clone,
1296                    buffer_sender,
1297                    buffer_receiver,
1298                )
1299                .inspect_err(|err: &ethrex_trie::trie_sorted::TrieGenerationError| {
1300                    error!(
1301                        "we found an error while inserting the storage trie for the account {account_hash:x}, err {err}"
1302                    );
1303                })
1304                .map_err(SyncError::TrieGenerationError);
1305                let _ = sender.send(());
1306            });
1307            pool.execute(task);
1308        }
1309    });
1310
1311    // close db before removing directory
1312    drop(snapshot);
1313    drop(db);
1314
1315    async_fs::remove_dir_all(account_storages_snapshots_dir).await?;
1316    async_fs::remove_dir_all(&get_rocksdb_temp_storage_dir(datadir)).await?;
1317
1318    Ok(())
1319}
1320
1321#[cfg(test)]
1322mod block_sync_state_tests {
1323    use super::*;
1324    use ethrex_storage::EngineType;
1325
1326    // A crash between writing the header-download checkpoint and the headers
1327    // themselves leaves a checkpoint pointing at an unknown header. Resuming
1328    // from it must fall back to the canonical head instead of failing with
1329    // the same recoverable error on every retry, forever.
1330    #[tokio::test]
1331    async fn dangling_header_checkpoint_falls_back_to_canonical_head() {
1332        let store = Store::new("", EngineType::InMemory).expect("in-memory store");
1333        let dangling = H256::random();
1334        store
1335            .set_header_download_checkpoint(dangling)
1336            .await
1337            .expect("write checkpoint");
1338        let state = SnapBlockSyncState::new(store.clone());
1339        let head = state.get_current_head().await.expect("resume head");
1340        let canonical = store
1341            .get_latest_canonical_block_hash()
1342            .await
1343            .expect("read canonical")
1344            .expect("canonical head");
1345        assert_eq!(
1346            head, canonical,
1347            "resume returned a checkpoint whose header was never stored"
1348        );
1349    }
1350
1351    #[tokio::test]
1352    async fn valid_header_checkpoint_is_resumed() {
1353        let store = Store::new("", EngineType::InMemory).expect("in-memory store");
1354        let header = BlockHeader::default();
1355        let hash = header.hash();
1356        store
1357            .add_block_headers(vec![header])
1358            .await
1359            .expect("store header");
1360        store
1361            .set_header_download_checkpoint(hash)
1362            .await
1363            .expect("write checkpoint");
1364        let state = SnapBlockSyncState::new(store.clone());
1365        assert_eq!(state.get_current_head().await.expect("resume head"), hash);
1366    }
1367}