Skip to main content

ethrex_p2p/
sync.rs

1//! Sync module - orchestrates full and snap synchronization
2//!
3//! This module provides the main `Syncer` type that coordinates synchronization
4//! between full sync mode (all blocks executed) and snap sync mode (state fetched
5//! via snap protocol).
6
7mod code_collector;
8mod full;
9mod healing;
10mod snap_sync;
11
12/// Test-only re-export of the full-sync resume-point predicate so integration tests can
13/// assert that canonical-but-stateless blocks are not treated as already-executed.
14#[cfg(feature = "test-utils")]
15pub use full::{first_resume_point_in_batch, is_resume_point};
16
17use crate::metrics::METRICS;
18use crate::peer_handler::{BlockRequestOrder, HeaderFetchOutcome, PeerHandler, PeerHandlerError};
19use crate::snap::constants::{EXECUTE_BATCH_SIZE_DEFAULT, MIN_FULL_BLOCKS};
20use crate::utils::delete_leaves_folder;
21use ethrex_blockchain::{Blockchain, error::ChainError};
22use ethrex_common::H256;
23use ethrex_rlp::error::RLPDecodeError;
24use ethrex_storage::{Store, error::StoreError};
25use ethrex_trie::TrieError;
26use ethrex_trie::trie_sorted::TrieGenerationError;
27use spawned_concurrency::error::ActorError;
28use std::collections::{BTreeMap, HashSet};
29use std::path::PathBuf;
30use std::sync::{
31    Arc,
32    atomic::{AtomicBool, Ordering},
33};
34use tokio::sync::mpsc::error::SendError;
35use tokio::time::Instant;
36use tokio_util::sync::CancellationToken;
37use tracing::{debug, error, info, warn};
38
39// Re-export types used by submodules
40pub use snap_sync::{
41    SnapBlockSyncState, block_is_stale, calculate_staleness_timestamp, update_pivot,
42    validate_bytecodes, validate_state_root, validate_storage_root,
43};
44
45#[cfg(feature = "sync-test")]
46lazy_static::lazy_static! {
47    static ref EXECUTE_BATCH_SIZE: usize = std::env::var("EXECUTE_BATCH_SIZE").map(|var| var.parse().expect("Execute batch size environmental variable is not a number")).unwrap_or(EXECUTE_BATCH_SIZE_DEFAULT);
48}
49#[cfg(not(feature = "sync-test"))]
50lazy_static::lazy_static! {
51    static ref EXECUTE_BATCH_SIZE: usize = EXECUTE_BATCH_SIZE_DEFAULT;
52}
53
54#[derive(Debug, PartialEq, Clone, Default)]
55pub enum SyncMode {
56    #[default]
57    Full,
58    Snap,
59}
60
61/// Diagnostic snapshot of the sync state, used by admin RPC endpoints.
62#[derive(Debug, Clone, Default, serde::Serialize)]
63pub struct SyncDiagnostics {
64    pub sync_mode: String,
65    pub current_phase: String,
66    /// Highest block whose post-state is actually on disk (the executed/state head).
67    /// Updated by the full-sync cycle. May trail the canonical head when an FCU
68    /// canonicalized blocks before their state was computed; `eth_syncing` reports
69    /// this rather than the canonical pointer so the node isn't shown as near-synced
70    /// while it has no state up to the tip.
71    pub executed_head: u64,
72    pub pivot_block_number: Option<u64>,
73    pub pivot_timestamp: Option<u64>,
74    pub pivot_age_seconds: Option<u64>,
75    pub staleness_threshold_seconds: u64,
76    pub phase_progress: std::collections::HashMap<String, u64>,
77    pub recent_pivot_changes: std::collections::VecDeque<PivotChangeEvent>,
78    pub recent_errors: std::collections::VecDeque<SyncErrorEvent>,
79}
80
81#[derive(Debug, Clone, serde::Serialize)]
82pub struct PivotChangeEvent {
83    pub timestamp: u64,
84    pub old_pivot_number: u64,
85    pub new_pivot_number: u64,
86    pub outcome: String,
87    pub failure_reason: Option<String>,
88}
89
90#[derive(Debug, Clone, serde::Serialize)]
91pub struct SyncErrorEvent {
92    pub timestamp: u64,
93    pub error_type: String,
94    pub error_message: String,
95    pub recoverable: bool,
96}
97
98impl SyncDiagnostics {
99    const MAX_PIVOT_CHANGES: usize = 10;
100    const MAX_ERRORS: usize = 20;
101
102    pub fn push_pivot_change(&mut self, event: PivotChangeEvent) {
103        if self.recent_pivot_changes.len() >= Self::MAX_PIVOT_CHANGES {
104            self.recent_pivot_changes.pop_front();
105        }
106        self.recent_pivot_changes.push_back(event);
107    }
108
109    pub fn push_error(&mut self, event: SyncErrorEvent) {
110        if self.recent_errors.len() >= Self::MAX_ERRORS {
111            self.recent_errors.pop_front();
112        }
113        self.recent_errors.push_back(event);
114    }
115}
116
117/// Manager in charge the sync process
118#[derive(Debug)]
119pub struct Syncer {
120    /// This is also held by the SyncManager allowing it to track the latest syncmode, without modifying it
121    /// No outside process should modify this value, only being modified by the sync cycle
122    snap_enabled: Arc<AtomicBool>,
123    peers: PeerHandler,
124    // Used for cancelling long-living tasks upon shutdown
125    cancel_token: CancellationToken,
126    blockchain: Arc<Blockchain>,
127    /// This string indicates a folder where the snap algorithm will store temporary files that are
128    /// used during the syncing process
129    datadir: PathBuf,
130    diagnostics: Arc<tokio::sync::RwLock<SyncDiagnostics>>,
131}
132
133impl Syncer {
134    pub fn new(
135        peers: PeerHandler,
136        snap_enabled: Arc<AtomicBool>,
137        cancel_token: CancellationToken,
138        blockchain: Arc<Blockchain>,
139        datadir: PathBuf,
140        diagnostics: Arc<tokio::sync::RwLock<SyncDiagnostics>>,
141    ) -> Self {
142        Self {
143            snap_enabled,
144            peers,
145            cancel_token,
146            blockchain,
147            datadir,
148            diagnostics,
149        }
150    }
151
152    /// Starts a sync cycle, updating the state with all blocks between the current head and the sync head
153    /// Will perform either full or snap sync depending on the manager's `snap_mode`
154    /// In full mode, all blocks will be fetched via p2p eth requests and executed to rebuild the state
155    /// In snap mode, blocks and receipts will be fetched and stored in parallel while the state is fetched via p2p snap requests
156    /// After the sync cycle is complete, the sync mode will be set to full
157    /// If the sync fails, no error will be returned but a warning will be emitted
158    /// [WARNING] Sync is done optimistically, so headers and bodies may be stored even if their data has not been fully synced if the sync is aborted halfway
159    /// [WARNING] Sync is currenlty simplified and will not download bodies + receipts previous to the pivot during snap sync
160    pub async fn start_sync(&mut self, sync_head: H256, store: Store) {
161        let start_time = Instant::now();
162        match self.sync_cycle(sync_head, store).await {
163            Ok(()) => {
164                self.diagnostics.write().await.current_phase = "idle".to_string();
165                info!(
166                    time_elapsed_s = start_time.elapsed().as_secs(),
167                    %sync_head,
168                    "Sync cycle finished successfully",
169                );
170            }
171
172            // If the error is irrecoverable, we exit ethrex
173            Err(error) => {
174                let recoverable = error.is_recoverable();
175                self.diagnostics.write().await.current_phase = "idle".to_string();
176                debug!(
177                    error_type = %error,
178                    recoverable = recoverable,
179                    action = if recoverable { "retry" } else { "exit" },
180                    "Sync cycle error classification"
181                );
182                self.diagnostics.write().await.push_error(SyncErrorEvent {
183                    timestamp: std::time::SystemTime::now()
184                        .duration_since(std::time::UNIX_EPOCH)
185                        .unwrap_or_default()
186                        .as_secs(),
187                    error_type: format!("{:?}", std::mem::discriminant(&error)),
188                    error_message: error.to_string(),
189                    recoverable,
190                });
191                match recoverable {
192                    false => {
193                        // We exit the node, as we can't recover this error
194                        error!(
195                            time_elapsed_s = start_time.elapsed().as_secs(),
196                            %sync_head,
197                            %error, "Sync cycle failed, exiting as the error is irrecoverable",
198                        );
199                        std::process::exit(2);
200                    }
201                    true => {
202                        // We do nothing, as the error is recoverable
203                        warn!(
204                            time_elapsed_s = start_time.elapsed().as_secs(),
205                            %sync_head,
206                            %error, "Sync cycle failed, retrying",
207                        );
208                    }
209                }
210            }
211        }
212    }
213
214    /// Performs the sync cycle described in `start_sync`, returns an error if the sync fails at any given step and aborts all active processes
215    async fn sync_cycle(&mut self, sync_head: H256, store: Store) -> Result<(), SyncError> {
216        // Take picture of the current sync mode, we will update the original value when we need to
217        if self.snap_enabled.load(Ordering::Relaxed) {
218            // Probe the sync head's block number before committing to snap sync.
219            // On a fresh devnet the chain head may be only a few blocks deep; the
220            // existing in-loop `head_close_to_0` guard in `sync_cycle_snap`
221            // (snap_sync.rs, same `< MIN_FULL_BLOCKS` check) is only reached
222            // after a successful header batch, which can stall when peers are
223            // barely synced themselves. Pre-checking avoids that. The probe
224            // response is intentionally discarded; on the snap path the loop
225            // re-fetches headers, which keeps `sync_cycle_snap`'s entry simple.
226            if let Some(sync_head_number) = probe_sync_head_number(&mut self.peers, sync_head).await
227                && sync_head_number < MIN_FULL_BLOCKS
228            {
229                info!(
230                    sync_head_number,
231                    "Sync head below MIN_FULL_BLOCKS ({MIN_FULL_BLOCKS}), using full sync"
232                );
233                self.snap_enabled.store(false, Ordering::Relaxed);
234                // Clear any stale snap checkpoint so the manager loop in
235                // `sync_manager.rs` doesn't keep re-entering this branch
236                // after the full sync completes. Mirrors the cleanup done
237                // when the manager auto-switches to full on startup.
238                if let Err(e) = store.clear_snap_state().await {
239                    warn!("Failed to clear stale snap state: {e}");
240                }
241                return full::sync_cycle_full(
242                    &mut self.peers,
243                    self.blockchain.clone(),
244                    self.cancel_token.clone(),
245                    sync_head,
246                    store,
247                    &self.diagnostics,
248                )
249                .await;
250            }
251            METRICS.enable().await;
252            // We validate that we have the folders that are being used empty, as we currently assume
253            // they are. If they are not empty we empty the folder
254            delete_leaves_folder(&self.datadir);
255            let sync_cycle_result = snap_sync::sync_cycle_snap(
256                &mut self.peers,
257                self.blockchain.clone(),
258                &self.snap_enabled,
259                sync_head,
260                store,
261                &self.datadir,
262                &self.diagnostics,
263            )
264            .await;
265            METRICS.disable().await;
266            sync_cycle_result
267        } else {
268            full::sync_cycle_full(
269                &mut self.peers,
270                self.blockchain.clone(),
271                self.cancel_token.clone(),
272                sync_head,
273                store,
274                &self.diagnostics,
275            )
276            .await
277        }
278    }
279}
280
281/// Number of attempts to fetch the sync head's header for the snap-vs-full pre-check.
282const PROBE_SYNC_HEAD_ATTEMPTS: u32 = 3;
283/// Delay between probe attempts.
284const PROBE_SYNC_HEAD_RETRY_DELAY: std::time::Duration = std::time::Duration::from_secs(2);
285
286/// Tries to fetch the block header for `sync_head` and return its number.
287///
288/// Returns `None` if peers don't respond with the requested header within
289/// `PROBE_SYNC_HEAD_ATTEMPTS`. Callers should treat that as "couldn't decide"
290/// and fall through to the regular sync path.
291///
292/// Worst-case latency budget: `PROBE_SYNC_HEAD_ATTEMPTS` × `PEER_REPLY_TIMEOUT`
293/// (5s, from `snap/constants.rs`) + (`PROBE_SYNC_HEAD_ATTEMPTS` − 1) ×
294/// `PROBE_SYNC_HEAD_RETRY_DELAY` = ~19s on a peer-starved network before we
295/// fall through to the snap path. On a healthy network the first attempt
296/// usually returns in well under a second.
297async fn probe_sync_head_number(peers: &mut PeerHandler, sync_head: H256) -> Option<u64> {
298    for attempt in 1..=PROBE_SYNC_HEAD_ATTEMPTS {
299        match peers
300            .request_block_headers_from_hash(sync_head, BlockRequestOrder::NewToOld)
301            .await
302        {
303            Ok(HeaderFetchOutcome::Headers(headers)) => {
304                if let Some(header) = headers.iter().find(|h| h.hash() == sync_head) {
305                    return Some(header.number);
306                }
307                debug!("Sync head probe: response did not contain target header");
308            }
309            Ok(outcome) => {
310                debug!(
311                    reason = outcome.failure_reason(),
312                    "Sync head probe attempt {attempt}/{PROBE_SYNC_HEAD_ATTEMPTS}: no headers"
313                );
314            }
315            Err(e) => {
316                warn!("Sync head probe attempt {attempt}/{PROBE_SYNC_HEAD_ATTEMPTS} failed: {e}");
317            }
318        }
319        if attempt < PROBE_SYNC_HEAD_ATTEMPTS {
320            tokio::time::sleep(PROBE_SYNC_HEAD_RETRY_DELAY).await;
321        }
322    }
323    None
324}
325
326#[derive(Debug, Default)]
327#[allow(clippy::type_complexity)]
328/// We store for optimization the accounts that need to heal storage
329pub struct AccountStorageRoots {
330    /// The accounts that have not been healed are guaranteed to have the original storage root
331    /// we can read this storage root
332    pub accounts_with_storage_root: BTreeMap<H256, (Option<H256>, Vec<(H256, H256)>)>,
333    /// If an account has been healed, it may return to a previous state, so we just store the account
334    /// in a hashset
335    pub healed_accounts: HashSet<H256>,
336}
337
338#[derive(thiserror::Error, Debug)]
339pub enum SyncError {
340    #[error(transparent)]
341    Chain(#[from] ChainError),
342    #[error(transparent)]
343    Store(#[from] StoreError),
344    #[error("{0}")]
345    Send(String),
346    #[error(transparent)]
347    Trie(#[from] TrieError),
348    #[error(transparent)]
349    Rlp(#[from] RLPDecodeError),
350    #[error(transparent)]
351    JoinHandle(#[from] tokio::task::JoinError),
352    #[error("Missing data from DB")]
353    CorruptDB,
354    #[error("Failed to fetch latest canonical block, unable to sync")]
355    NoLatestCanonical,
356    #[error("Range received is invalid")]
357    InvalidRangeReceived,
358    #[error("Failed to fetch block number for head {0}")]
359    BlockNumber(H256),
360    #[error("No blocks found")]
361    NoBlocks,
362    #[error("Failed to read snapshot from {0:?} with error {1:?}")]
363    SnapshotReadError(PathBuf, std::io::Error),
364    #[error("Failed to RLP decode account_state_snapshot from {0:?}")]
365    SnapshotDecodeError(PathBuf),
366    #[error("Failed to RLP decode code_hashes_snapshot from {0:?}")]
367    CodeHashesSnapshotDecodeError(PathBuf),
368    #[error("Failed to get account state for block {0:?} and account hash {1:?}")]
369    AccountState(H256, H256),
370    #[error("Failed to fetch bytecodes from peers")]
371    BytecodesNotFound,
372    #[error("Failed to get account state snapshots directory")]
373    AccountStateSnapshotsDirNotFound,
374    #[error("Failed to get account storages snapshots directory")]
375    AccountStoragesSnapshotsDirNotFound,
376    #[error("Failed to get code hashes snapshots directory")]
377    CodeHashesSnapshotsDirNotFound,
378    #[error("Got different state roots for account hash: {0:?}, expected: {1:?}, computed: {2:?}")]
379    DifferentStateRoots(H256, H256, H256),
380    #[error("Failed to get block headers")]
381    NoBlockHeaders,
382    #[error("Peer handler error: {0}")]
383    PeerHandler(#[from] PeerHandlerError),
384    #[error("Parent not found in healing queue. Parent: {0}, path: {1}")]
385    HealingQueueInconsistency(String, String),
386    #[error("Filesystem error: {0}")]
387    FileSystem(String),
388    #[error("Sorted Trie Generation Error: {0}")]
389    TrieGenerationError(#[from] TrieGenerationError),
390    #[error("Failed to get account temp db directory: {0}")]
391    AccountTempDBDirNotFound(String),
392    #[error("Failed to get storage temp db directory: {0}")]
393    StorageTempDBDirNotFound(String),
394    #[error("RocksDB Error: {0}")]
395    RocksDBError(String),
396    #[error("Bytecode file error")]
397    BytecodeFileError,
398    #[error("Error in Peer Table: {0}")]
399    PeerTableError(#[from] ActorError),
400    #[error("Missing fullsync batch")]
401    MissingFullsyncBatch,
402    #[error("Snap error: {0}")]
403    Snap(#[from] crate::snap::SnapError),
404}
405
406impl SyncError {
407    pub fn is_recoverable(&self) -> bool {
408        // PeerHandler delegates to its own classification so that transient
409        // peer/network errors retry while structural errors (dead actor,
410        // local storage full) still exit.
411        if let SyncError::PeerHandler(e) = self {
412            return e.is_recoverable();
413        }
414        match self {
415            SyncError::SnapshotReadError(_, _)
416            | SyncError::SnapshotDecodeError(_)
417            | SyncError::CodeHashesSnapshotDecodeError(_)
418            | SyncError::AccountState(_, _)
419            | SyncError::BytecodesNotFound
420            | SyncError::AccountStateSnapshotsDirNotFound
421            | SyncError::AccountStoragesSnapshotsDirNotFound
422            | SyncError::CodeHashesSnapshotsDirNotFound
423            | SyncError::DifferentStateRoots(_, _, _)
424            | SyncError::HealingQueueInconsistency(_, _)
425            | SyncError::TrieGenerationError(_)
426            | SyncError::AccountTempDBDirNotFound(_)
427            | SyncError::StorageTempDBDirNotFound(_)
428            | SyncError::RocksDBError(_)
429            | SyncError::BytecodeFileError
430            | SyncError::NoLatestCanonical
431            | SyncError::MissingFullsyncBatch
432            | SyncError::Snap(_)
433            | SyncError::FileSystem(_) => false,
434            // A timed-out actor request is transient (mailbox pressure or a
435            // slow handler — requests use spawned-concurrency's 5s default
436            // timeout); a stopped actor means p2p is shutting down and must
437            // stay fatal.
438            SyncError::PeerTableError(ActorError::RequestTimeout) => true,
439            SyncError::PeerTableError(ActorError::ActorStopped) => false,
440            SyncError::Chain(_)
441            | SyncError::Store(_)
442            | SyncError::Send(_)
443            | SyncError::Trie(_)
444            | SyncError::Rlp(_)
445            | SyncError::JoinHandle(_)
446            | SyncError::CorruptDB
447            | SyncError::InvalidRangeReceived
448            | SyncError::BlockNumber(_)
449            | SyncError::NoBlocks
450            | SyncError::NoBlockHeaders => true,
451            // PeerHandler handled above by delegation
452            SyncError::PeerHandler(_) => unreachable!(),
453        }
454    }
455}
456
457impl<T> From<SendError<T>> for SyncError {
458    fn from(value: SendError<T>) -> Self {
459        Self::Send(value.to_string())
460    }
461}