1mod code_collector;
8mod full;
9mod healing;
10mod snap_sync;
11
12#[cfg(feature = "test-utils")]
15pub use full::{first_resume_point_in_batch, is_resume_point};
16
17use crate::metrics::METRICS;
18use crate::peer_handler::{BlockRequestOrder, HeaderFetchOutcome, PeerHandler, PeerHandlerError};
19use crate::snap::constants::{EXECUTE_BATCH_SIZE_DEFAULT, MIN_FULL_BLOCKS};
20use crate::utils::delete_leaves_folder;
21use ethrex_blockchain::{Blockchain, error::ChainError};
22use ethrex_common::H256;
23use ethrex_rlp::error::RLPDecodeError;
24use ethrex_storage::{Store, error::StoreError};
25use ethrex_trie::TrieError;
26use ethrex_trie::trie_sorted::TrieGenerationError;
27use spawned_concurrency::error::ActorError;
28use std::collections::{BTreeMap, HashSet};
29use std::path::PathBuf;
30use std::sync::{
31 Arc,
32 atomic::{AtomicBool, Ordering},
33};
34use tokio::sync::mpsc::error::SendError;
35use tokio::time::Instant;
36use tokio_util::sync::CancellationToken;
37use tracing::{debug, error, info, warn};
38
39pub use snap_sync::{
41 SnapBlockSyncState, block_is_stale, calculate_staleness_timestamp, update_pivot,
42 validate_bytecodes, validate_state_root, validate_storage_root,
43};
44
45#[cfg(feature = "sync-test")]
46lazy_static::lazy_static! {
47 static ref EXECUTE_BATCH_SIZE: usize = std::env::var("EXECUTE_BATCH_SIZE").map(|var| var.parse().expect("Execute batch size environmental variable is not a number")).unwrap_or(EXECUTE_BATCH_SIZE_DEFAULT);
48}
49#[cfg(not(feature = "sync-test"))]
50lazy_static::lazy_static! {
51 static ref EXECUTE_BATCH_SIZE: usize = EXECUTE_BATCH_SIZE_DEFAULT;
52}
53
54#[derive(Debug, PartialEq, Clone, Default)]
55pub enum SyncMode {
56 #[default]
57 Full,
58 Snap,
59}
60
61#[derive(Debug, Clone, Default, serde::Serialize)]
63pub struct SyncDiagnostics {
64 pub sync_mode: String,
65 pub current_phase: String,
66 pub executed_head: u64,
72 pub pivot_block_number: Option<u64>,
73 pub pivot_timestamp: Option<u64>,
74 pub pivot_age_seconds: Option<u64>,
75 pub staleness_threshold_seconds: u64,
76 pub phase_progress: std::collections::HashMap<String, u64>,
77 pub recent_pivot_changes: std::collections::VecDeque<PivotChangeEvent>,
78 pub recent_errors: std::collections::VecDeque<SyncErrorEvent>,
79}
80
81#[derive(Debug, Clone, serde::Serialize)]
82pub struct PivotChangeEvent {
83 pub timestamp: u64,
84 pub old_pivot_number: u64,
85 pub new_pivot_number: u64,
86 pub outcome: String,
87 pub failure_reason: Option<String>,
88}
89
90#[derive(Debug, Clone, serde::Serialize)]
91pub struct SyncErrorEvent {
92 pub timestamp: u64,
93 pub error_type: String,
94 pub error_message: String,
95 pub recoverable: bool,
96}
97
98impl SyncDiagnostics {
99 const MAX_PIVOT_CHANGES: usize = 10;
100 const MAX_ERRORS: usize = 20;
101
102 pub fn push_pivot_change(&mut self, event: PivotChangeEvent) {
103 if self.recent_pivot_changes.len() >= Self::MAX_PIVOT_CHANGES {
104 self.recent_pivot_changes.pop_front();
105 }
106 self.recent_pivot_changes.push_back(event);
107 }
108
109 pub fn push_error(&mut self, event: SyncErrorEvent) {
110 if self.recent_errors.len() >= Self::MAX_ERRORS {
111 self.recent_errors.pop_front();
112 }
113 self.recent_errors.push_back(event);
114 }
115}
116
117#[derive(Debug)]
119pub struct Syncer {
120 snap_enabled: Arc<AtomicBool>,
123 peers: PeerHandler,
124 cancel_token: CancellationToken,
126 blockchain: Arc<Blockchain>,
127 datadir: PathBuf,
130 diagnostics: Arc<tokio::sync::RwLock<SyncDiagnostics>>,
131}
132
133impl Syncer {
134 pub fn new(
135 peers: PeerHandler,
136 snap_enabled: Arc<AtomicBool>,
137 cancel_token: CancellationToken,
138 blockchain: Arc<Blockchain>,
139 datadir: PathBuf,
140 diagnostics: Arc<tokio::sync::RwLock<SyncDiagnostics>>,
141 ) -> Self {
142 Self {
143 snap_enabled,
144 peers,
145 cancel_token,
146 blockchain,
147 datadir,
148 diagnostics,
149 }
150 }
151
152 pub async fn start_sync(&mut self, sync_head: H256, store: Store) {
161 let start_time = Instant::now();
162 match self.sync_cycle(sync_head, store).await {
163 Ok(()) => {
164 self.diagnostics.write().await.current_phase = "idle".to_string();
165 info!(
166 time_elapsed_s = start_time.elapsed().as_secs(),
167 %sync_head,
168 "Sync cycle finished successfully",
169 );
170 }
171
172 Err(error) => {
174 let recoverable = error.is_recoverable();
175 self.diagnostics.write().await.current_phase = "idle".to_string();
176 debug!(
177 error_type = %error,
178 recoverable = recoverable,
179 action = if recoverable { "retry" } else { "exit" },
180 "Sync cycle error classification"
181 );
182 self.diagnostics.write().await.push_error(SyncErrorEvent {
183 timestamp: std::time::SystemTime::now()
184 .duration_since(std::time::UNIX_EPOCH)
185 .unwrap_or_default()
186 .as_secs(),
187 error_type: format!("{:?}", std::mem::discriminant(&error)),
188 error_message: error.to_string(),
189 recoverable,
190 });
191 match recoverable {
192 false => {
193 error!(
195 time_elapsed_s = start_time.elapsed().as_secs(),
196 %sync_head,
197 %error, "Sync cycle failed, exiting as the error is irrecoverable",
198 );
199 std::process::exit(2);
200 }
201 true => {
202 warn!(
204 time_elapsed_s = start_time.elapsed().as_secs(),
205 %sync_head,
206 %error, "Sync cycle failed, retrying",
207 );
208 }
209 }
210 }
211 }
212 }
213
214 async fn sync_cycle(&mut self, sync_head: H256, store: Store) -> Result<(), SyncError> {
216 if self.snap_enabled.load(Ordering::Relaxed) {
218 if let Some(sync_head_number) = probe_sync_head_number(&mut self.peers, sync_head).await
227 && sync_head_number < MIN_FULL_BLOCKS
228 {
229 info!(
230 sync_head_number,
231 "Sync head below MIN_FULL_BLOCKS ({MIN_FULL_BLOCKS}), using full sync"
232 );
233 self.snap_enabled.store(false, Ordering::Relaxed);
234 if let Err(e) = store.clear_snap_state().await {
239 warn!("Failed to clear stale snap state: {e}");
240 }
241 return full::sync_cycle_full(
242 &mut self.peers,
243 self.blockchain.clone(),
244 self.cancel_token.clone(),
245 sync_head,
246 store,
247 &self.diagnostics,
248 )
249 .await;
250 }
251 METRICS.enable().await;
252 delete_leaves_folder(&self.datadir);
255 let sync_cycle_result = snap_sync::sync_cycle_snap(
256 &mut self.peers,
257 self.blockchain.clone(),
258 &self.snap_enabled,
259 sync_head,
260 store,
261 &self.datadir,
262 &self.diagnostics,
263 )
264 .await;
265 METRICS.disable().await;
266 sync_cycle_result
267 } else {
268 full::sync_cycle_full(
269 &mut self.peers,
270 self.blockchain.clone(),
271 self.cancel_token.clone(),
272 sync_head,
273 store,
274 &self.diagnostics,
275 )
276 .await
277 }
278 }
279}
280
281const PROBE_SYNC_HEAD_ATTEMPTS: u32 = 3;
283const PROBE_SYNC_HEAD_RETRY_DELAY: std::time::Duration = std::time::Duration::from_secs(2);
285
286async fn probe_sync_head_number(peers: &mut PeerHandler, sync_head: H256) -> Option<u64> {
298 for attempt in 1..=PROBE_SYNC_HEAD_ATTEMPTS {
299 match peers
300 .request_block_headers_from_hash(sync_head, BlockRequestOrder::NewToOld)
301 .await
302 {
303 Ok(HeaderFetchOutcome::Headers(headers)) => {
304 if let Some(header) = headers.iter().find(|h| h.hash() == sync_head) {
305 return Some(header.number);
306 }
307 debug!("Sync head probe: response did not contain target header");
308 }
309 Ok(outcome) => {
310 debug!(
311 reason = outcome.failure_reason(),
312 "Sync head probe attempt {attempt}/{PROBE_SYNC_HEAD_ATTEMPTS}: no headers"
313 );
314 }
315 Err(e) => {
316 warn!("Sync head probe attempt {attempt}/{PROBE_SYNC_HEAD_ATTEMPTS} failed: {e}");
317 }
318 }
319 if attempt < PROBE_SYNC_HEAD_ATTEMPTS {
320 tokio::time::sleep(PROBE_SYNC_HEAD_RETRY_DELAY).await;
321 }
322 }
323 None
324}
325
326#[derive(Debug, Default)]
327#[allow(clippy::type_complexity)]
328pub struct AccountStorageRoots {
330 pub accounts_with_storage_root: BTreeMap<H256, (Option<H256>, Vec<(H256, H256)>)>,
333 pub healed_accounts: HashSet<H256>,
336}
337
338#[derive(thiserror::Error, Debug)]
339pub enum SyncError {
340 #[error(transparent)]
341 Chain(#[from] ChainError),
342 #[error(transparent)]
343 Store(#[from] StoreError),
344 #[error("{0}")]
345 Send(String),
346 #[error(transparent)]
347 Trie(#[from] TrieError),
348 #[error(transparent)]
349 Rlp(#[from] RLPDecodeError),
350 #[error(transparent)]
351 JoinHandle(#[from] tokio::task::JoinError),
352 #[error("Missing data from DB")]
353 CorruptDB,
354 #[error("Failed to fetch latest canonical block, unable to sync")]
355 NoLatestCanonical,
356 #[error("Range received is invalid")]
357 InvalidRangeReceived,
358 #[error("Failed to fetch block number for head {0}")]
359 BlockNumber(H256),
360 #[error("No blocks found")]
361 NoBlocks,
362 #[error("Failed to read snapshot from {0:?} with error {1:?}")]
363 SnapshotReadError(PathBuf, std::io::Error),
364 #[error("Failed to RLP decode account_state_snapshot from {0:?}")]
365 SnapshotDecodeError(PathBuf),
366 #[error("Failed to RLP decode code_hashes_snapshot from {0:?}")]
367 CodeHashesSnapshotDecodeError(PathBuf),
368 #[error("Failed to get account state for block {0:?} and account hash {1:?}")]
369 AccountState(H256, H256),
370 #[error("Failed to fetch bytecodes from peers")]
371 BytecodesNotFound,
372 #[error("Failed to get account state snapshots directory")]
373 AccountStateSnapshotsDirNotFound,
374 #[error("Failed to get account storages snapshots directory")]
375 AccountStoragesSnapshotsDirNotFound,
376 #[error("Failed to get code hashes snapshots directory")]
377 CodeHashesSnapshotsDirNotFound,
378 #[error("Got different state roots for account hash: {0:?}, expected: {1:?}, computed: {2:?}")]
379 DifferentStateRoots(H256, H256, H256),
380 #[error("Failed to get block headers")]
381 NoBlockHeaders,
382 #[error("Peer handler error: {0}")]
383 PeerHandler(#[from] PeerHandlerError),
384 #[error("Parent not found in healing queue. Parent: {0}, path: {1}")]
385 HealingQueueInconsistency(String, String),
386 #[error("Filesystem error: {0}")]
387 FileSystem(String),
388 #[error("Sorted Trie Generation Error: {0}")]
389 TrieGenerationError(#[from] TrieGenerationError),
390 #[error("Failed to get account temp db directory: {0}")]
391 AccountTempDBDirNotFound(String),
392 #[error("Failed to get storage temp db directory: {0}")]
393 StorageTempDBDirNotFound(String),
394 #[error("RocksDB Error: {0}")]
395 RocksDBError(String),
396 #[error("Bytecode file error")]
397 BytecodeFileError,
398 #[error("Error in Peer Table: {0}")]
399 PeerTableError(#[from] ActorError),
400 #[error("Missing fullsync batch")]
401 MissingFullsyncBatch,
402 #[error("Snap error: {0}")]
403 Snap(#[from] crate::snap::SnapError),
404}
405
406impl SyncError {
407 pub fn is_recoverable(&self) -> bool {
408 if let SyncError::PeerHandler(e) = self {
412 return e.is_recoverable();
413 }
414 match self {
415 SyncError::SnapshotReadError(_, _)
416 | SyncError::SnapshotDecodeError(_)
417 | SyncError::CodeHashesSnapshotDecodeError(_)
418 | SyncError::AccountState(_, _)
419 | SyncError::BytecodesNotFound
420 | SyncError::AccountStateSnapshotsDirNotFound
421 | SyncError::AccountStoragesSnapshotsDirNotFound
422 | SyncError::CodeHashesSnapshotsDirNotFound
423 | SyncError::DifferentStateRoots(_, _, _)
424 | SyncError::HealingQueueInconsistency(_, _)
425 | SyncError::TrieGenerationError(_)
426 | SyncError::AccountTempDBDirNotFound(_)
427 | SyncError::StorageTempDBDirNotFound(_)
428 | SyncError::RocksDBError(_)
429 | SyncError::BytecodeFileError
430 | SyncError::NoLatestCanonical
431 | SyncError::MissingFullsyncBatch
432 | SyncError::Snap(_)
433 | SyncError::FileSystem(_) => false,
434 SyncError::PeerTableError(ActorError::RequestTimeout) => true,
439 SyncError::PeerTableError(ActorError::ActorStopped) => false,
440 SyncError::Chain(_)
441 | SyncError::Store(_)
442 | SyncError::Send(_)
443 | SyncError::Trie(_)
444 | SyncError::Rlp(_)
445 | SyncError::JoinHandle(_)
446 | SyncError::CorruptDB
447 | SyncError::InvalidRangeReceived
448 | SyncError::BlockNumber(_)
449 | SyncError::NoBlocks
450 | SyncError::NoBlockHeaders => true,
451 SyncError::PeerHandler(_) => unreachable!(),
453 }
454 }
455}
456
457impl<T> From<SendError<T>> for SyncError {
458 fn from(value: SendError<T>) -> Self {
459 Self::Send(value.to_string())
460 }
461}