1use std::collections::{BTreeSet, HashMap, HashSet};
7use std::path::Path;
8#[cfg(feature = "rocksdb")]
9use std::path::PathBuf;
10use std::sync::Arc;
11use std::sync::atomic::Ordering;
12use std::time::{Duration, SystemTime};
13
14use ethrex_blockchain::Blockchain;
15use ethrex_common::types::{AccountState, BlockHeader, Code};
16use ethrex_common::{
17 H256,
18 constants::{EMPTY_KECCAK_HASH, EMPTY_TRIE_HASH},
19};
20use ethrex_rlp::decode::RLPDecode;
21use ethrex_storage::Store;
22#[cfg(feature = "rocksdb")]
23use ethrex_trie::Trie;
24use rayon::iter::{ParallelBridge, ParallelIterator};
25use tracing::{debug, error, info, warn};
26
27use crate::metrics::{CurrentStepValue, METRICS};
28use crate::peer_handler::PeerHandler;
29use crate::peer_table::PeerTableServerProtocol as _;
30use crate::rlpx::p2p::SUPPORTED_ETH_CAPABILITIES;
31use crate::snap::{
32 async_fs,
33 constants::{
34 BYTECODE_CHUNK_SIZE, MAX_HEADER_FETCH_ATTEMPTS, MIN_FULL_BLOCKS, MISSING_SLOTS_PERCENTAGE,
35 SECONDS_PER_BLOCK, SNAP_LIMIT,
36 },
37 request_account_range, request_bytecodes, request_storage_ranges,
38};
39use crate::sync::code_collector::CodeHashCollector;
40use crate::sync::healing::{heal_state_trie_wrap, heal_storage_trie};
41use crate::utils::{
42 current_unix_time, get_account_state_snapshots_dir, get_account_storages_snapshots_dir,
43 get_code_hashes_snapshots_dir,
44};
45
46use super::{AccountStorageRoots, SyncError};
47
48#[cfg(not(feature = "rocksdb"))]
49use ethrex_common::U256;
50#[cfg(not(feature = "rocksdb"))]
51use ethrex_rlp::encode::RLPEncode;
52
53#[derive(Clone)]
55pub struct SnapBlockSyncState {
56 pub block_hashes: Vec<H256>,
57 store: Store,
58}
59
60impl SnapBlockSyncState {
61 pub fn new(store: Store) -> Self {
62 Self {
63 block_hashes: Vec::new(),
64 store,
65 }
66 }
67
68 pub async fn get_current_head(&self) -> Result<H256, SyncError> {
70 if let Some(head) = self.store.get_header_download_checkpoint().await? {
71 if self.store.get_block_number(head).await?.is_some() {
78 return Ok(head);
79 }
80 warn!(
81 checkpoint = %head,
82 "Header download checkpoint has no stored header; restarting header download from the canonical head"
83 );
84 }
85 self.store
86 .get_latest_canonical_block_hash()
87 .await?
88 .ok_or(SyncError::NoLatestCanonical)
89 }
90
91 pub async fn process_incoming_headers(
93 &mut self,
94 block_headers: impl Iterator<Item = BlockHeader>,
95 ) -> Result<(), SyncError> {
96 let mut block_headers_vec = Vec::with_capacity(block_headers.size_hint().1.unwrap_or(0));
97 let mut block_hashes = Vec::with_capacity(block_headers.size_hint().1.unwrap_or(0));
98 for header in block_headers {
99 block_hashes.push(header.hash());
100 block_headers_vec.push(header);
101 }
102 let checkpoint = *block_hashes.last().ok_or(SyncError::InvalidRangeReceived)?;
103 self.store.add_block_headers(block_headers_vec).await?;
107 self.store
108 .set_header_download_checkpoint(checkpoint)
109 .await?;
110 self.block_hashes.extend_from_slice(&block_hashes);
111 Ok(())
112 }
113}
114
115pub async fn sync_cycle_snap(
117 peers: &mut PeerHandler,
118 blockchain: Arc<Blockchain>,
119 snap_enabled: &std::sync::atomic::AtomicBool,
120 sync_head: H256,
121 store: Store,
122 datadir: &Path,
123 diagnostics: &Arc<tokio::sync::RwLock<super::SyncDiagnostics>>,
124) -> Result<(), SyncError> {
125 let mut block_sync_state = SnapBlockSyncState::new(store.clone());
129 let mut current_head = block_sync_state.get_current_head().await?;
133 let mut current_head_number = store
134 .get_block_number(current_head)
135 .await?
136 .ok_or(SyncError::BlockNumber(current_head))?;
137 {
138 let mut diag = diagnostics.write().await;
139 diag.current_phase = "headers".to_string();
140 diag.sync_mode = "snap".to_string();
141 }
142 debug!(
143 "Syncing from current head {:?} to sync_head {:?}",
144 current_head, sync_head
145 );
146 let pending_block = match store.get_pending_block(sync_head).await {
147 Ok(res) => res,
148 Err(e) => return Err(e.into()),
149 };
150
151 let mut attempts = 0;
152
153 loop {
154 let _ = peers.peer_table.prune_table();
156
157 debug!("Requesting Block Headers from {current_head}");
158
159 let Some(mut block_headers) = peers
160 .request_block_headers(current_head_number, sync_head)
161 .await?
162 else {
163 if attempts >= MAX_HEADER_FETCH_ATTEMPTS {
164 warn!(
165 "Sync failed to find target block header after {attempts} attempts, aborting to wait for a newer sync head"
166 );
167 return Ok(());
168 }
169 attempts += 1;
170 debug!(
171 "Failed to fetch headers for sync head (attempt {attempts}/{MAX_HEADER_FETCH_ATTEMPTS}), retrying in 2s"
172 );
173 tokio::time::sleep(Duration::from_secs(2)).await;
174 continue;
175 };
176 attempts = 0;
178
179 debug!("Sync Log 1: In snap sync");
180 debug!(
181 "Sync Log 2: State block hashes len {}",
182 block_sync_state.block_hashes.len()
183 );
184
185 let (first_block_hash, first_block_number, first_block_parent_hash) =
186 match block_headers.first() {
187 Some(header) => (header.hash(), header.number, header.parent_hash),
188 None => continue,
189 };
190 let (last_block_hash, last_block_number) = match block_headers.last() {
191 Some(header) => (header.hash(), header.number),
192 None => continue,
193 };
194 if first_block_hash == last_block_hash
197 && first_block_hash == current_head
198 && current_head != sync_head
199 {
200 warn!("Sync failed to find target block header, going back to the previous parent");
202 current_head = first_block_parent_hash;
203 continue;
204 }
205
206 debug!(
207 "Received {} block headers| First Number: {} Last Number: {}",
208 block_headers.len(),
209 first_block_number,
210 last_block_number
211 );
212
213 if let Some(ref block) = pending_block
216 && block.header.parent_hash == last_block_hash
217 {
218 block_headers.push(block.header.clone());
219 }
220
221 let mut sync_head_found = false;
223 if let Some(index) = block_headers
224 .iter()
225 .position(|header| header.hash() == sync_head)
226 {
227 sync_head_found = true;
228 block_headers.drain(index + 1..);
229 }
230
231 current_head = last_block_hash;
233 current_head_number = last_block_number;
234
235 let head_found = sync_head_found && store.get_latest_block_number().await? > 0;
237 let head_close_to_0 = last_block_number < MIN_FULL_BLOCKS;
241
242 if head_found || head_close_to_0 {
243 info!("Sync head is found, switching to FullSync");
245 snap_enabled.store(false, Ordering::Relaxed);
246 return super::full::sync_cycle_full(
247 peers,
248 blockchain,
249 tokio_util::sync::CancellationToken::new(),
250 sync_head,
251 store.clone(),
252 diagnostics,
253 )
254 .await;
255 }
256
257 if block_headers.len() > 1 {
259 let block_headers_iter = block_headers.into_iter().skip(1);
260
261 block_sync_state
262 .process_incoming_headers(block_headers_iter)
263 .await?;
264 }
265
266 {
268 let mut diag = diagnostics.write().await;
269 diag.phase_progress.insert(
270 "headers_downloaded".to_string(),
271 block_sync_state.block_hashes.len() as u64,
272 );
273 }
274
275 if sync_head_found {
276 break;
277 };
278 }
279
280 snap_sync(peers, &store, &mut block_sync_state, datadir, diagnostics).await?;
281
282 store.clear_snap_state().await?;
283 snap_enabled.store(false, Ordering::Relaxed);
284
285 Ok(())
286}
287
288pub async fn snap_sync(
290 peers: &mut PeerHandler,
291 store: &Store,
292 block_sync_state: &mut SnapBlockSyncState,
293 datadir: &Path,
294 diagnostics: &Arc<tokio::sync::RwLock<super::SyncDiagnostics>>,
295) -> Result<(), SyncError> {
296 let pivot_hash = block_sync_state
301 .block_hashes
302 .last()
303 .ok_or(SyncError::NoBlockHeaders)?;
304 let mut pivot_header = store
305 .get_block_header_by_hash(*pivot_hash)?
306 .ok_or(SyncError::CorruptDB)?;
307
308 while block_is_stale(&pivot_header) {
309 pivot_header = update_pivot(
310 pivot_header.number,
311 pivot_header.timestamp,
312 peers,
313 block_sync_state,
314 diagnostics,
315 )
316 .await?;
317 }
318 debug!(
319 "Selected block {} as pivot for snap sync",
320 pivot_header.number
321 );
322 {
323 let mut diag = diagnostics.write().await;
324 diag.pivot_block_number = Some(pivot_header.number);
325 diag.pivot_timestamp = Some(pivot_header.timestamp);
326 let pivot_age = current_unix_time().saturating_sub(pivot_header.timestamp);
327 diag.pivot_age_seconds = Some(pivot_age);
328 diag.staleness_threshold_seconds = (SNAP_LIMIT as u64) * SECONDS_PER_BLOCK;
329 diag.sync_mode = "snap".to_string();
330 METRICS
331 .pivot_timestamp
332 .store(pivot_header.timestamp, std::sync::atomic::Ordering::Relaxed);
333 }
334
335 let state_root = pivot_header.state_root;
336 let account_state_snapshots_dir = get_account_state_snapshots_dir(datadir);
337 let account_storages_snapshots_dir = get_account_storages_snapshots_dir(datadir);
338
339 let code_hashes_snapshot_dir = get_code_hashes_snapshots_dir(datadir);
340 async_fs::ensure_dir_exists(&code_hashes_snapshot_dir).await?;
341
342 let mut code_hash_collector: CodeHashCollector =
344 CodeHashCollector::new(code_hashes_snapshot_dir.clone());
345
346 let mut storage_accounts = AccountStorageRoots::default();
347 if !std::env::var("SKIP_START_SNAP_SYNC").is_ok_and(|var| !var.is_empty()) {
348 diagnostics.write().await.current_phase = "account_ranges".to_string();
353 request_account_range(
354 peers,
355 H256::zero(),
356 H256::repeat_byte(0xff),
357 account_state_snapshots_dir.as_ref(),
358 &mut pivot_header,
359 block_sync_state,
360 diagnostics,
361 )
362 .await?;
363 debug!("Finished downloading account ranges from peers");
364
365 {
366 let mut diag = diagnostics.write().await;
367 diag.current_phase = "account_insertion".to_string();
368 diag.phase_progress.insert(
369 "account_ranges_downloaded".to_string(),
370 METRICS
371 .downloaded_account_tries
372 .load(std::sync::atomic::Ordering::Relaxed),
373 );
374 }
375 *METRICS.account_tries_insert_start_time.lock().await = Some(SystemTime::now());
376 METRICS
377 .current_step
378 .set(CurrentStepValue::InsertingAccountRanges);
379 #[allow(unused_variables)]
384 let (computed_state_root, accounts_with_storage) = insert_accounts(
385 store.clone(),
386 &mut storage_accounts,
387 &account_state_snapshots_dir,
388 datadir,
389 &mut code_hash_collector,
390 )
391 .await?;
392 debug!(
393 "Finished inserting account ranges, total storage accounts: {}",
394 storage_accounts.accounts_with_storage_root.len()
395 );
396 *METRICS.account_tries_insert_end_time.lock().await = Some(SystemTime::now());
397
398 debug!("Original state root: {state_root:?}");
399 debug!("Computed state root after request_account_ranges: {computed_state_root:?}");
400
401 diagnostics.write().await.current_phase = "storage_ranges".to_string();
402 *METRICS.storage_tries_download_start_time.lock().await = Some(SystemTime::now());
403 let mut chunk_index = 0_u64;
406 let mut state_leafs_healed = 0_u64;
407 let mut storage_range_request_attempts = 0;
408 loop {
409 while block_is_stale(&pivot_header) {
410 pivot_header = update_pivot(
411 pivot_header.number,
412 pivot_header.timestamp,
413 peers,
414 block_sync_state,
415 diagnostics,
416 )
417 .await?;
418 }
419 if !heal_state_trie_wrap(
422 pivot_header.state_root,
423 store.clone(),
424 peers,
425 calculate_staleness_timestamp(pivot_header.timestamp),
426 &mut state_leafs_healed,
427 &mut storage_accounts,
428 &mut code_hash_collector,
429 )
430 .await?
431 {
432 continue;
433 };
434
435 debug!(
436 "Started request_storage_ranges with {} accounts with storage root unchanged",
437 storage_accounts.accounts_with_storage_root.len()
438 );
439 storage_range_request_attempts += 1;
440 if storage_range_request_attempts < 5 {
441 chunk_index = request_storage_ranges(
442 peers,
443 &mut storage_accounts,
444 account_storages_snapshots_dir.as_ref(),
445 chunk_index,
446 &mut pivot_header,
447 store.clone(),
448 )
449 .await?;
450 } else {
451 for (acc_hash, (maybe_root, old_intervals)) in
452 storage_accounts.accounts_with_storage_root.iter()
453 {
454 storage_accounts.healed_accounts.insert(*acc_hash);
459 debug!(
460 "We couldn't download these accounts on request_storage_ranges. Falling back to storage healing for it.
461 Account hash: {:x?}, {:x?}. Number of intervals {}",
462 acc_hash,
463 maybe_root,
464 old_intervals.len()
465 );
466 }
467
468 warn!(
469 "Storage could not be downloaded after multiple attempts. Marking for healing. This could impact snap sync time (healing may take a while)."
470 );
471
472 storage_accounts.accounts_with_storage_root.clear();
473 }
474
475 debug!(
476 "Ended request_storage_ranges with {} accounts with storage root unchanged and not downloaded yet and with {} big/healed accounts",
477 storage_accounts.accounts_with_storage_root.len(),
478 storage_accounts.healed_accounts.len(),
481 );
482 if !block_is_stale(&pivot_header) {
483 break;
484 }
485 debug!("Pivot became stale during storage download, restarting loop");
486 }
487 debug!("Finished request_storage_ranges");
488 *METRICS.storage_tries_download_end_time.lock().await = Some(SystemTime::now());
489
490 diagnostics.write().await.current_phase = "storage_insertion".to_string();
491 *METRICS.storage_tries_insert_start_time.lock().await = Some(SystemTime::now());
492 METRICS
493 .current_step
494 .set(CurrentStepValue::InsertingStorageRanges);
495 let account_storages_snapshots_dir = get_account_storages_snapshots_dir(datadir);
496
497 insert_storages(
498 store.clone(),
499 accounts_with_storage,
500 &account_storages_snapshots_dir,
501 datadir,
502 )
503 .await?;
504
505 *METRICS.storage_tries_insert_end_time.lock().await = Some(SystemTime::now());
506
507 debug!("Finished storing storage tries");
508 }
509
510 diagnostics.write().await.current_phase = "healing".to_string();
511 *METRICS.heal_start_time.lock().await = Some(SystemTime::now());
512 debug!("Starting healing process");
513 let mut global_state_leafs_healed: u64 = 0;
514 let mut global_storage_leafs_healed: u64 = 0;
515 let mut healing_done = false;
516 while !healing_done {
517 if block_is_stale(&pivot_header) {
519 pivot_header = update_pivot(
520 pivot_header.number,
521 pivot_header.timestamp,
522 peers,
523 block_sync_state,
524 diagnostics,
525 )
526 .await?;
527 }
528 healing_done = heal_state_trie_wrap(
529 pivot_header.state_root,
530 store.clone(),
531 peers,
532 calculate_staleness_timestamp(pivot_header.timestamp),
533 &mut global_state_leafs_healed,
534 &mut storage_accounts,
535 &mut code_hash_collector,
536 )
537 .await?;
538 if !healing_done {
539 continue;
540 }
541 healing_done = heal_storage_trie(
542 pivot_header.state_root,
543 &storage_accounts,
544 peers,
545 store.clone(),
546 HashMap::new(),
547 calculate_staleness_timestamp(pivot_header.timestamp),
548 &mut global_storage_leafs_healed,
549 )
550 .await?;
551 }
552 *METRICS.heal_end_time.lock().await = Some(SystemTime::now());
553
554 store.generate_flatkeyvalue()?;
555
556 debug_assert!(validate_state_root(store.clone(), pivot_header.state_root).await);
557 debug_assert!(validate_storage_root(store.clone(), pivot_header.state_root).await);
558
559 debug!("Finished healing");
560
561 code_hash_collector.finish().await?;
563
564 *METRICS.bytecode_download_start_time.lock().await = Some(SystemTime::now());
565
566 let code_hashes_dir = get_code_hashes_snapshots_dir(datadir);
567 let mut seen_code_hashes = HashSet::new();
568 let mut code_hashes_to_download = Vec::new();
569
570 diagnostics.write().await.current_phase = "bytecodes".to_string();
571 debug!("Starting download code hashes from peers");
572 let code_hash_files = async_fs::read_dir_paths(&code_hashes_dir).await?;
573 for file_path in code_hash_files {
574 let snapshot_contents = async_fs::read_file(&file_path).await?;
575 let code_hashes: Vec<H256> = RLPDecode::decode(&snapshot_contents)
576 .map_err(|_| SyncError::CodeHashesSnapshotDecodeError(file_path))?;
577
578 for hash in code_hashes {
579 if seen_code_hashes.insert(hash) {
581 code_hashes_to_download.push(hash);
582
583 if code_hashes_to_download.len() >= BYTECODE_CHUNK_SIZE {
584 debug!(
585 "Starting bytecode download of {} hashes",
586 code_hashes_to_download.len()
587 );
588 let bytecodes = request_bytecodes(peers, &code_hashes_to_download)
589 .await?
590 .ok_or(SyncError::BytecodesNotFound)?;
591
592 store
593 .write_account_code_batch(
594 code_hashes_to_download
595 .drain(..)
596 .zip(bytecodes)
597 .map(|(hash, code)| {
599 (hash, Code::from_bytecode_unchecked(code, hash))
600 })
601 .collect(),
602 )
603 .await?;
604 }
605 }
606 }
607 }
608
609 if !code_hashes_to_download.is_empty() {
611 let bytecodes = request_bytecodes(peers, &code_hashes_to_download)
612 .await?
613 .ok_or(SyncError::BytecodesNotFound)?;
614 store
615 .write_account_code_batch(
616 code_hashes_to_download
617 .drain(..)
618 .zip(bytecodes)
619 .map(|(hash, code)| (hash, Code::from_bytecode_unchecked(code, hash)))
621 .collect(),
622 )
623 .await?;
624 }
625
626 async_fs::remove_dir_all(&code_hashes_dir).await?;
627
628 *METRICS.bytecode_download_end_time.lock().await = Some(SystemTime::now());
629
630 debug_assert!(validate_bytecodes(store.clone(), pivot_header.state_root));
631
632 store_block_bodies(vec![pivot_header.clone()], peers.clone(), store.clone()).await?;
633
634 let block = store
635 .get_block_by_hash(pivot_header.hash())
636 .await?
637 .ok_or(SyncError::CorruptDB)?;
638
639 store.add_block(block).await?;
640
641 let numbers_and_hashes = block_sync_state
642 .block_hashes
643 .iter()
644 .rev()
645 .enumerate()
646 .map(|(i, hash)| (pivot_header.number - i as u64, *hash))
647 .collect::<Vec<_>>();
648
649 store
650 .forkchoice_update(
651 numbers_and_hashes,
652 pivot_header.number,
653 pivot_header.hash(),
654 None,
655 None,
656 )
657 .await?;
658 Ok(())
659}
660
661pub async fn store_block_bodies(
663 mut block_headers: Vec<BlockHeader>,
664 mut peers: PeerHandler,
665 store: Store,
666) -> Result<(), SyncError> {
667 loop {
668 debug!("Requesting Block Bodies ");
669 if let Some(block_bodies) = peers.request_block_bodies(&block_headers).await? {
670 debug!(" Received {} Block Bodies", block_bodies.len());
671 let current_block_headers = block_headers.drain(..block_bodies.len());
673 for (hash, body) in current_block_headers
675 .map(|h| h.hash())
676 .zip(block_bodies.into_iter())
677 {
678 store.add_block_body(hash, body).await?;
679 }
680
681 if block_headers.is_empty() {
683 break;
684 }
685 }
686 }
687 Ok(())
688}
689
690pub async fn update_pivot(
691 block_number: u64,
692 block_timestamp: u64,
693 peers: &mut PeerHandler,
694 block_sync_state: &mut SnapBlockSyncState,
695 diagnostics: &Arc<tokio::sync::RwLock<super::SyncDiagnostics>>,
696) -> Result<BlockHeader, SyncError> {
697 const MAX_ROTATIONS: u64 = 5;
701 const INITIAL_RETRY_DELAY: Duration = Duration::from_secs(1);
702 const MAX_RETRY_DELAY: Duration = Duration::from_secs(30);
703
704 let new_pivot_block_number = block_number
706 + ((current_unix_time().saturating_sub(block_timestamp) / SECONDS_PER_BLOCK) as f64
707 * MISSING_SLOTS_PERCENTAGE) as u64;
708 debug!(
709 "Current pivot is stale (number: {}, timestamp: {}). New pivot number: {}",
710 block_number, block_timestamp, new_pivot_block_number
711 );
712
713 let mut rotation_count: u64 = 0;
714 let mut excluded_peers: Vec<H256> = Vec::new();
718
719 loop {
720 if rotation_count >= MAX_ROTATIONS {
721 #[cfg(feature = "metrics")]
722 ethrex_metrics::sync::METRICS_SYNC.inc_pivot_update("max_failures");
723 diagnostics
724 .write()
725 .await
726 .push_pivot_change(super::PivotChangeEvent {
727 timestamp: current_unix_time(),
728 old_pivot_number: block_number,
729 new_pivot_number: new_pivot_block_number,
730 outcome: "max_failures".to_string(),
731 failure_reason: Some(format!("Exhausted {MAX_ROTATIONS} full rotations")),
732 });
733 return Err(SyncError::PeerHandler(
734 crate::peer_handler::PeerHandlerError::BlockHeaders,
735 ));
736 }
737
738 if rotation_count > 0 {
740 let delay = INITIAL_RETRY_DELAY.saturating_mul(1 << rotation_count.min(4));
741 let delay = delay.min(MAX_RETRY_DELAY);
742 debug!(
743 "update_pivot: backing off for {}s (rotation={rotation_count})",
744 delay.as_secs()
745 );
746 tokio::time::sleep(delay).await;
747 }
748
749 let Some((peer_id, mut connection, permit)) = peers
751 .peer_table
752 .get_best_peer_excluding(SUPPORTED_ETH_CAPABILITIES.to_vec(), excluded_peers.clone())
753 .await?
754 else {
755 let any_eligible = peers
758 .peer_table
759 .has_eligible_peer(SUPPORTED_ETH_CAPABILITIES.to_vec())
760 .await?;
761
762 if !any_eligible {
763 debug!("update_pivot: no eligible peers available, waiting");
764 #[cfg(feature = "metrics")]
765 ethrex_metrics::sync::METRICS_SYNC.inc_pivot_update("no_peers");
766 tokio::time::sleep(Duration::from_secs(1)).await;
767 } else if excluded_peers.is_empty() {
768 debug!("update_pivot: peers exist but none selectable, retrying");
770 tokio::time::sleep(Duration::from_secs(1)).await;
771 } else {
772 debug!(
774 "update_pivot: rotation {rotation_count} complete ({} peers tried), starting next",
775 excluded_peers.len()
776 );
777 excluded_peers.clear();
778 rotation_count = rotation_count.saturating_add(1);
779 }
780 continue;
781 };
782
783 let peer_score = peers.peer_table.get_score(peer_id).await?;
784 let diag = peers.read_peer_diagnostics().await;
785 let eligible_count = diag.iter().filter(|p| p.eligible).count();
786 let total_count = diag.len();
787 debug!(
788 eligible_peers = eligible_count,
789 total_peers = total_count,
790 selected_peer = %peer_id,
791 peer_score = peer_score,
792 excluded_count = excluded_peers.len(),
793 rotation = rotation_count,
794 "update_pivot: attempting with peer"
795 );
796 debug!(
797 "Trying to update pivot to {new_pivot_block_number} with peer {peer_id} (score: {peer_score})"
798 );
799
800 let outcome = peers
803 .get_block_header(&mut connection, permit, new_pivot_block_number)
804 .await;
805
806 match outcome {
807 Ok(Some(pivot)) => {
808 peers.peer_table.record_success(peer_id)?;
809 #[cfg(feature = "metrics")]
810 ethrex_metrics::sync::METRICS_SYNC.inc_pivot_update("success");
811 info!("Snap sync pivot updated to block {}", pivot.number);
812
813 {
814 let mut diag = diagnostics.write().await;
815 diag.push_pivot_change(super::PivotChangeEvent {
816 timestamp: current_unix_time(),
817 old_pivot_number: block_number,
818 new_pivot_number: pivot.number,
819 outcome: "success".to_string(),
820 failure_reason: None,
821 });
822 diag.pivot_block_number = Some(pivot.number);
823 diag.pivot_timestamp = Some(pivot.timestamp);
824 let pivot_age = current_unix_time().saturating_sub(pivot.timestamp);
825 diag.pivot_age_seconds = Some(pivot_age);
826 METRICS
827 .pivot_timestamp
828 .store(pivot.timestamp, std::sync::atomic::Ordering::Relaxed);
829 }
830 let block_headers = peers
831 .request_block_headers(block_number + 1, pivot.hash())
832 .await?
833 .ok_or(SyncError::NoBlockHeaders)?;
834 block_sync_state
835 .process_incoming_headers(block_headers.into_iter())
836 .await?;
837 *METRICS.sync_head_hash.lock().await = pivot.hash();
838 return Ok(pivot);
839 }
840 Ok(None) => {
841 peers.peer_table.record_failure(peer_id)?;
842 let peer_score = peers.peer_table.get_score(peer_id).await?;
843 debug!(
844 "update_pivot: peer {peer_id} returned None (score: {peer_score}), excluding for this rotation"
845 );
846 #[cfg(feature = "metrics")]
847 ethrex_metrics::sync::METRICS_SYNC.inc_pivot_update("peer_none");
848 excluded_peers.push(peer_id);
849 }
850 Err(e) if e.is_recoverable() => {
851 peers.peer_table.record_failure(peer_id)?;
852 debug!("update_pivot: peer {peer_id} failed with {e}, excluding for this rotation");
853 #[cfg(feature = "metrics")]
854 ethrex_metrics::sync::METRICS_SYNC.inc_pivot_update("peer_error");
855 excluded_peers.push(peer_id);
856 }
857 Err(e) => {
858 return Err(SyncError::PeerHandler(e));
861 }
862 }
863 }
864}
865
866pub fn block_is_stale(block_header: &BlockHeader) -> bool {
867 let threshold = calculate_staleness_timestamp(block_header.timestamp);
868 let now = current_unix_time();
869 let is_stale = threshold < now;
870 if is_stale {
871 let pivot_age = now.saturating_sub(block_header.timestamp);
872 let staleness_limit = (SNAP_LIMIT as u64) * SECONDS_PER_BLOCK;
873 debug!(
874 pivot_number = block_header.number,
875 pivot_timestamp = block_header.timestamp,
876 pivot_age_seconds = pivot_age,
877 staleness_threshold_seconds = staleness_limit,
878 "Pivot block detected as stale"
879 );
880 }
881 is_stale
882}
883
884pub fn calculate_staleness_timestamp(timestamp: u64) -> u64 {
885 timestamp + (SNAP_LIMIT as u64 * 12)
886}
887
888pub async fn validate_state_root(store: Store, state_root: H256) -> bool {
889 info!("Starting validate_state_root");
890 let validated = tokio::task::spawn_blocking(move || {
891 store
892 .open_locked_state_trie(state_root)
893 .expect("couldn't open trie")
894 .validate()
895 })
896 .await
897 .expect("We should be able to create threads");
898
899 if validated.is_ok() {
900 info!("Successfully validated tree, {state_root} found");
901 } else {
902 error!("We have failed the validation of the state tree");
903 std::process::exit(1);
904 }
905 validated.is_ok()
906}
907
908pub async fn validate_storage_root(store: Store, state_root: H256) -> bool {
909 info!("Starting validate_storage_root");
910 let is_valid = tokio::task::spawn_blocking(move || {
911 store
912 .iter_accounts(state_root)
913 .expect("couldn't iterate accounts")
914 .par_bridge()
915 .try_for_each(|(hashed_address, account_state)| {
916 let store_clone = store.clone();
917 store_clone
918 .open_locked_storage_trie(
919 hashed_address,
920 state_root,
921 account_state.storage_root,
922 )
923 .expect("couldn't open storage trie")
924 .validate()
925 })
926 })
927 .await
928 .expect("We should be able to create threads");
929 info!("Finished validate_storage_root");
930 if is_valid.is_err() {
931 std::process::exit(1);
932 }
933 is_valid.is_ok()
934}
935
936pub fn validate_bytecodes(store: Store, state_root: H256) -> bool {
937 info!("Starting validate_bytecodes");
938 let mut is_valid = true;
939 for (account_hash, account_state) in store
940 .iter_accounts(state_root)
941 .expect("we couldn't iterate over accounts")
942 {
943 if account_state.code_hash != *EMPTY_KECCAK_HASH
944 && !store
945 .get_account_code(account_state.code_hash)
946 .is_ok_and(|code| code.is_some())
947 {
948 error!(
949 "Missing code hash {:x} for account {:x}",
950 account_state.code_hash, account_hash
951 );
952 is_valid = false
953 }
954 }
955 if !is_valid {
956 std::process::exit(1);
957 }
958 is_valid
959}
960
961#[cfg(not(feature = "rocksdb"))]
966type StorageRoots = (H256, Vec<(ethrex_trie::Nibbles, Vec<u8>)>);
967
968#[cfg(not(feature = "rocksdb"))]
969fn compute_storage_roots(
970 store: Store,
971 account_hash: H256,
972 key_value_pairs: &[(H256, U256)],
973) -> Result<StorageRoots, SyncError> {
974 use ethrex_trie::{Nibbles, Node};
975
976 let storage_trie = store.open_direct_storage_trie(account_hash, *EMPTY_TRIE_HASH)?;
977 let trie_hash = match storage_trie.db().get(Nibbles::default())? {
978 Some(noderlp) => Node::decode(&noderlp)?
979 .compute_hash(ðrex_crypto::NativeCrypto)
980 .finalize(ðrex_crypto::NativeCrypto),
981 None => *EMPTY_TRIE_HASH,
982 };
983 let mut storage_trie = store.open_direct_storage_trie(account_hash, trie_hash)?;
984
985 for (hashed_key, value) in key_value_pairs {
986 if let Err(err) = storage_trie.insert(hashed_key.0.to_vec(), value.encode_to_vec()) {
987 warn!(
988 "Failed to insert hashed key {hashed_key:?} in account hash: {account_hash:?}, err={err:?}"
989 );
990 };
991 METRICS.storage_leaves_inserted.inc();
992 }
993
994 let (_, changes) = storage_trie.collect_changes_since_last_hash(ðrex_crypto::NativeCrypto);
995
996 Ok((account_hash, changes))
997}
998
999#[cfg(not(feature = "rocksdb"))]
1000async fn insert_accounts(
1001 store: Store,
1002 storage_accounts: &mut AccountStorageRoots,
1003 account_state_snapshots_dir: &Path,
1004 _: &Path,
1005 code_hash_collector: &mut CodeHashCollector,
1006) -> Result<(H256, BTreeSet<H256>), SyncError> {
1007 let mut computed_state_root = *EMPTY_TRIE_HASH;
1008 let snapshot_files = async_fs::read_dir_paths(account_state_snapshots_dir).await?;
1009 for snapshot_path in snapshot_files {
1010 debug!("Reading account file from {snapshot_path:?}");
1011 let snapshot_contents = async_fs::read_file(&snapshot_path).await?;
1012 let account_states_snapshot: Vec<(H256, AccountState)> =
1013 RLPDecode::decode(&snapshot_contents)
1014 .map_err(|_| SyncError::SnapshotDecodeError(snapshot_path.clone()))?;
1015
1016 storage_accounts.accounts_with_storage_root.extend(
1017 account_states_snapshot.iter().filter_map(|(hash, state)| {
1018 (state.storage_root != *EMPTY_TRIE_HASH)
1019 .then_some((*hash, (Some(state.storage_root), Vec::new())))
1020 }),
1021 );
1022
1023 let code_hashes_from_snapshot: Vec<H256> = account_states_snapshot
1025 .iter()
1026 .filter_map(|(_, state)| {
1027 (state.code_hash != *EMPTY_KECCAK_HASH).then_some(state.code_hash)
1028 })
1029 .collect();
1030
1031 code_hash_collector.extend(code_hashes_from_snapshot);
1032 code_hash_collector.flush_if_needed().await?;
1033
1034 info!("Inserting accounts into the state trie");
1035
1036 let store_clone = store.clone();
1037 let current_state_root: Result<H256, SyncError> =
1038 tokio::task::spawn_blocking(move || -> Result<H256, SyncError> {
1039 let mut trie = store_clone.open_direct_state_trie(computed_state_root)?;
1040
1041 for (account_hash, account) in account_states_snapshot {
1042 trie.insert(account_hash.0.to_vec(), account.encode_to_vec())?;
1043 }
1044 info!("Comitting to disk");
1045 let current_state_root = trie.hash(ðrex_crypto::NativeCrypto)?;
1046 Ok(current_state_root)
1047 })
1048 .await?;
1049
1050 computed_state_root = current_state_root?;
1051 }
1052 async_fs::remove_dir_all(account_state_snapshots_dir).await?;
1053 info!("computed_state_root {computed_state_root}");
1054 Ok((computed_state_root, BTreeSet::new()))
1055}
1056
1057#[cfg(not(feature = "rocksdb"))]
1058async fn insert_storages(
1059 store: Store,
1060 _: BTreeSet<H256>,
1061 account_storages_snapshots_dir: &Path,
1062 _: &Path,
1063) -> Result<(), SyncError> {
1064 use crate::utils::AccountsWithStorage;
1065 use rayon::iter::IntoParallelIterator;
1066
1067 let snapshot_files = async_fs::read_dir_paths(account_storages_snapshots_dir).await?;
1068 for snapshot_path in snapshot_files {
1069 info!("Reading account storage file from {snapshot_path:?}");
1070
1071 let snapshot_contents = async_fs::read_file(&snapshot_path).await?;
1072
1073 #[expect(clippy::type_complexity)]
1074 let account_storages_snapshot: Vec<AccountsWithStorage> =
1075 RLPDecode::decode(&snapshot_contents)
1076 .map(|all_accounts: Vec<(Vec<H256>, Vec<(H256, U256)>)>| {
1077 all_accounts
1078 .into_iter()
1079 .map(|(accounts, storages)| AccountsWithStorage { accounts, storages })
1080 .collect()
1081 })
1082 .map_err(|_| SyncError::SnapshotDecodeError(snapshot_path.clone()))?;
1083
1084 let store_clone = store.clone();
1085 info!("Starting compute of account_storages_snapshot");
1086 let storage_trie_node_changes = tokio::task::spawn_blocking(move || {
1087 let store: Store = store_clone;
1088
1089 account_storages_snapshot
1090 .into_par_iter()
1091 .flat_map(|account_storages| {
1092 let storages: Arc<[_]> = account_storages.storages.into();
1093 account_storages
1094 .accounts
1095 .into_par_iter()
1096 .map(move |account| (account, storages.clone()))
1098 })
1099 .map(|(account, storages)| compute_storage_roots(store.clone(), account, &storages))
1100 .collect::<Result<Vec<_>, SyncError>>()
1101 })
1102 .await??;
1103 info!("Writing to db");
1104
1105 store
1106 .write_storage_trie_nodes_batch(storage_trie_node_changes)
1107 .await?;
1108 }
1109
1110 async_fs::remove_dir_all(account_storages_snapshots_dir).await?;
1111
1112 Ok(())
1113}
1114
1115#[cfg(feature = "rocksdb")]
1120async fn insert_accounts(
1121 store: Store,
1122 storage_accounts: &mut AccountStorageRoots,
1123 account_state_snapshots_dir: &Path,
1124 datadir: &Path,
1125 code_hash_collector: &mut CodeHashCollector,
1126) -> Result<(H256, BTreeSet<H256>), SyncError> {
1127 use crate::utils::get_rocksdb_temp_accounts_dir;
1128 use ethrex_trie::trie_sorted::trie_from_sorted_accounts_wrap;
1129
1130 let trie = store.open_direct_state_trie(*EMPTY_TRIE_HASH)?;
1131 let mut db_options = rocksdb::Options::default();
1132 db_options.create_if_missing(true);
1133 let db = rocksdb::DB::open(&db_options, get_rocksdb_temp_accounts_dir(datadir))
1134 .map_err(|e| SyncError::AccountTempDBDirNotFound(e.to_string()))?;
1135 let file_paths: Vec<PathBuf> = async_fs::read_dir_paths(account_state_snapshots_dir).await?;
1136 let mut ingest_opts = rocksdb::IngestExternalFileOptions::default();
1140 ingest_opts.set_move_files(true);
1141 db.ingest_external_file_opts(&ingest_opts, file_paths)
1142 .map_err(|err| SyncError::RocksDBError(err.into_string()))?;
1143 let iter = db.full_iterator(rocksdb::IteratorMode::Start);
1144 for account in iter {
1145 let account = account.map_err(|err| SyncError::RocksDBError(err.into_string()))?;
1146 let account_state = AccountState::decode(&account.1).map_err(SyncError::Rlp)?;
1147 if account_state.code_hash != *EMPTY_KECCAK_HASH {
1148 code_hash_collector.add(account_state.code_hash);
1149 code_hash_collector.flush_if_needed().await?;
1150 }
1151 }
1152
1153 let iter = db.full_iterator(rocksdb::IteratorMode::Start);
1154 let compute_state_root = trie_from_sorted_accounts_wrap(
1155 trie.db(),
1156 &mut iter
1157 .map(|k| k.expect("We shouldn't have a rocksdb error here")) .inspect(|(k, v)| {
1159 METRICS
1160 .account_tries_inserted
1161 .fetch_add(1, Ordering::Relaxed);
1162 let account_state = AccountState::decode(v).expect("We should have accounts here");
1163 if account_state.storage_root != *EMPTY_TRIE_HASH {
1164 storage_accounts.accounts_with_storage_root.insert(
1165 H256::from_slice(k),
1166 (Some(account_state.storage_root), Vec::new()),
1167 );
1168 }
1169 })
1170 .map(|(k, v)| (H256::from_slice(&k), v.to_vec())),
1171 )
1172 .map_err(SyncError::TrieGenerationError)?;
1173
1174 drop(db); async_fs::remove_dir_all(account_state_snapshots_dir).await?;
1177 async_fs::remove_dir_all(&get_rocksdb_temp_accounts_dir(datadir)).await?;
1178
1179 let accounts_with_storage =
1180 BTreeSet::from_iter(storage_accounts.accounts_with_storage_root.keys().copied());
1181 Ok((compute_state_root, accounts_with_storage))
1182}
1183
1184#[cfg(feature = "rocksdb")]
1185async fn insert_storages(
1186 store: Store,
1187 accounts_with_storage: BTreeSet<H256>,
1188 account_storages_snapshots_dir: &Path,
1189 datadir: &Path,
1190) -> Result<(), SyncError> {
1191 use crate::utils::get_rocksdb_temp_storage_dir;
1192 use crossbeam::channel::{bounded, unbounded};
1193 use ethrex_trie::{
1194 Nibbles, Node, ThreadPool,
1195 trie_sorted::{BUFFER_COUNT, SIZE_TO_WRITE_DB, trie_from_sorted_accounts},
1196 };
1197 use std::thread::scope;
1198
1199 struct RocksDBIterator<'a> {
1200 iter: rocksdb::DBRawIterator<'a>,
1201 limit: H256,
1202 }
1203
1204 impl<'a> Iterator for RocksDBIterator<'a> {
1205 type Item = (H256, Vec<u8>);
1206
1207 fn next(&mut self) -> Option<Self::Item> {
1208 if !self.iter.valid() {
1209 return None;
1210 }
1211 let return_value = {
1212 let key = self.iter.key();
1213 let value = self.iter.value();
1214 match (key, value) {
1215 (Some(key), Some(value)) => {
1216 let hash = H256::from_slice(&key[0..32]);
1217 let key = H256::from_slice(&key[32..]);
1218 let value = value.to_vec();
1219 if hash != self.limit {
1220 None
1221 } else {
1222 Some((key, value))
1223 }
1224 }
1225 _ => None,
1226 }
1227 };
1228 self.iter.next();
1229 return_value
1230 }
1231 }
1232
1233 let mut db_options = rocksdb::Options::default();
1234 db_options.create_if_missing(true);
1235 let db = rocksdb::DB::open(&db_options, get_rocksdb_temp_storage_dir(datadir))
1236 .map_err(|err: rocksdb::Error| SyncError::RocksDBError(err.into_string()))?;
1237 let file_paths: Vec<PathBuf> = async_fs::read_dir_paths(account_storages_snapshots_dir).await?;
1238 let mut ingest_opts = rocksdb::IngestExternalFileOptions::default();
1242 ingest_opts.set_move_files(true);
1243 db.ingest_external_file_opts(&ingest_opts, file_paths)
1244 .map_err(|err| SyncError::RocksDBError(err.into_string()))?;
1245 let snapshot = db.snapshot();
1246
1247 let account_with_storage_and_tries = accounts_with_storage
1248 .into_iter()
1249 .map(|account_hash| {
1250 (
1251 account_hash,
1252 store
1253 .open_direct_storage_trie(account_hash, *EMPTY_TRIE_HASH)
1254 .expect("Should be able to open trie"),
1255 )
1256 })
1257 .collect::<Vec<(H256, Trie)>>();
1258
1259 let (sender, receiver) = unbounded::<()>();
1260 let mut counter = 0;
1261 let thread_count = std::thread::available_parallelism()
1262 .map(|num| num.into())
1263 .unwrap_or(8);
1264
1265 let (buffer_sender, buffer_receiver) = bounded::<Vec<(Nibbles, Node)>>(BUFFER_COUNT as usize);
1266 for _ in 0..BUFFER_COUNT {
1267 let _ = buffer_sender.send(Vec::with_capacity(SIZE_TO_WRITE_DB as usize));
1268 }
1269
1270 scope(|scope| {
1271 let pool: Arc<ThreadPool<'_>> = Arc::new(ThreadPool::new(thread_count, scope));
1272 for (account_hash, trie) in account_with_storage_and_tries.iter() {
1273 let sender = sender.clone();
1274 let buffer_sender = buffer_sender.clone();
1275 let buffer_receiver = buffer_receiver.clone();
1276 if counter >= thread_count - 1 {
1277 let _ = receiver.recv();
1278 counter -= 1;
1279 }
1280 counter += 1;
1281 let pool_clone = pool.clone();
1282 let mut iter = snapshot.raw_iterator();
1283 let task = Box::new(move || {
1284 let mut buffer: [u8; 64] = [0_u8; 64];
1285 buffer[..32].copy_from_slice(&account_hash.0);
1286 iter.seek(buffer);
1287 let iter = RocksDBIterator {
1288 iter,
1289 limit: *account_hash,
1290 };
1291
1292 let _ = trie_from_sorted_accounts(
1293 trie.db(),
1294 &mut iter.inspect(|_| METRICS.storage_leaves_inserted.inc()),
1295 pool_clone,
1296 buffer_sender,
1297 buffer_receiver,
1298 )
1299 .inspect_err(|err: ðrex_trie::trie_sorted::TrieGenerationError| {
1300 error!(
1301 "we found an error while inserting the storage trie for the account {account_hash:x}, err {err}"
1302 );
1303 })
1304 .map_err(SyncError::TrieGenerationError);
1305 let _ = sender.send(());
1306 });
1307 pool.execute(task);
1308 }
1309 });
1310
1311 drop(snapshot);
1313 drop(db);
1314
1315 async_fs::remove_dir_all(account_storages_snapshots_dir).await?;
1316 async_fs::remove_dir_all(&get_rocksdb_temp_storage_dir(datadir)).await?;
1317
1318 Ok(())
1319}
1320
1321#[cfg(test)]
1322mod block_sync_state_tests {
1323 use super::*;
1324 use ethrex_storage::EngineType;
1325
1326 #[tokio::test]
1331 async fn dangling_header_checkpoint_falls_back_to_canonical_head() {
1332 let store = Store::new("", EngineType::InMemory).expect("in-memory store");
1333 let dangling = H256::random();
1334 store
1335 .set_header_download_checkpoint(dangling)
1336 .await
1337 .expect("write checkpoint");
1338 let state = SnapBlockSyncState::new(store.clone());
1339 let head = state.get_current_head().await.expect("resume head");
1340 let canonical = store
1341 .get_latest_canonical_block_hash()
1342 .await
1343 .expect("read canonical")
1344 .expect("canonical head");
1345 assert_eq!(
1346 head, canonical,
1347 "resume returned a checkpoint whose header was never stored"
1348 );
1349 }
1350
1351 #[tokio::test]
1352 async fn valid_header_checkpoint_is_resumed() {
1353 let store = Store::new("", EngineType::InMemory).expect("in-memory store");
1354 let header = BlockHeader::default();
1355 let hash = header.hash();
1356 store
1357 .add_block_headers(vec![header])
1358 .await
1359 .expect("store header");
1360 store
1361 .set_header_download_checkpoint(hash)
1362 .await
1363 .expect("write checkpoint");
1364 let state = SnapBlockSyncState::new(store.clone());
1365 assert_eq!(state.get_current_head().await.expect("resume head"), hash);
1366 }
1367}