ethrex-p2p 17.0.0

Peer-to-peer networking (discv4/discv5, RLPx, eth, snap) for the ethrex Ethereum client
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
//! Sync module - orchestrates full and snap synchronization
//!
//! This module provides the main `Syncer` type that coordinates synchronization
//! between full sync mode (all blocks executed) and snap sync mode (state fetched
//! via snap protocol).

mod code_collector;
mod full;
mod healing;
mod snap_sync;

/// Test-only re-export of the full-sync resume-point predicate so integration tests can
/// assert that canonical-but-stateless blocks are not treated as already-executed.
#[cfg(feature = "test-utils")]
pub use full::{first_resume_point_in_batch, is_resume_point};

use crate::metrics::METRICS;
use crate::peer_handler::{BlockRequestOrder, HeaderFetchOutcome, PeerHandler, PeerHandlerError};
use crate::snap::constants::{EXECUTE_BATCH_SIZE_DEFAULT, MIN_FULL_BLOCKS};
use crate::utils::delete_leaves_folder;
use ethrex_blockchain::{Blockchain, error::ChainError};
use ethrex_common::H256;
use ethrex_rlp::error::RLPDecodeError;
use ethrex_storage::{Store, error::StoreError};
use ethrex_trie::TrieError;
use ethrex_trie::trie_sorted::TrieGenerationError;
use spawned_concurrency::error::ActorError;
use std::collections::{BTreeMap, HashSet};
use std::path::PathBuf;
use std::sync::{
    Arc,
    atomic::{AtomicBool, Ordering},
};
use tokio::sync::mpsc::error::SendError;
use tokio::time::Instant;
use tokio_util::sync::CancellationToken;
use tracing::{debug, error, info, warn};

// Re-export types used by submodules
pub use snap_sync::{
    SnapBlockSyncState, block_is_stale, calculate_staleness_timestamp, update_pivot,
    validate_bytecodes, validate_state_root, validate_storage_root,
};

#[cfg(feature = "sync-test")]
lazy_static::lazy_static! {
    static ref EXECUTE_BATCH_SIZE: usize = std::env::var("EXECUTE_BATCH_SIZE").map(|var| var.parse().expect("Execute batch size environmental variable is not a number")).unwrap_or(EXECUTE_BATCH_SIZE_DEFAULT);
}
#[cfg(not(feature = "sync-test"))]
lazy_static::lazy_static! {
    static ref EXECUTE_BATCH_SIZE: usize = EXECUTE_BATCH_SIZE_DEFAULT;
}

#[derive(Debug, PartialEq, Clone, Default)]
pub enum SyncMode {
    #[default]
    Full,
    Snap,
}

/// Diagnostic snapshot of the sync state, used by admin RPC endpoints.
#[derive(Debug, Clone, Default, serde::Serialize)]
pub struct SyncDiagnostics {
    pub sync_mode: String,
    pub current_phase: String,
    /// Highest block whose post-state is actually on disk (the executed/state head).
    /// Updated by the full-sync cycle. May trail the canonical head when an FCU
    /// canonicalized blocks before their state was computed; `eth_syncing` reports
    /// this rather than the canonical pointer so the node isn't shown as near-synced
    /// while it has no state up to the tip.
    pub executed_head: u64,
    pub pivot_block_number: Option<u64>,
    pub pivot_timestamp: Option<u64>,
    pub pivot_age_seconds: Option<u64>,
    pub staleness_threshold_seconds: u64,
    pub phase_progress: std::collections::HashMap<String, u64>,
    pub recent_pivot_changes: std::collections::VecDeque<PivotChangeEvent>,
    pub recent_errors: std::collections::VecDeque<SyncErrorEvent>,
}

#[derive(Debug, Clone, serde::Serialize)]
pub struct PivotChangeEvent {
    pub timestamp: u64,
    pub old_pivot_number: u64,
    pub new_pivot_number: u64,
    pub outcome: String,
    pub failure_reason: Option<String>,
}

#[derive(Debug, Clone, serde::Serialize)]
pub struct SyncErrorEvent {
    pub timestamp: u64,
    pub error_type: String,
    pub error_message: String,
    pub recoverable: bool,
}

impl SyncDiagnostics {
    const MAX_PIVOT_CHANGES: usize = 10;
    const MAX_ERRORS: usize = 20;

    pub fn push_pivot_change(&mut self, event: PivotChangeEvent) {
        if self.recent_pivot_changes.len() >= Self::MAX_PIVOT_CHANGES {
            self.recent_pivot_changes.pop_front();
        }
        self.recent_pivot_changes.push_back(event);
    }

    pub fn push_error(&mut self, event: SyncErrorEvent) {
        if self.recent_errors.len() >= Self::MAX_ERRORS {
            self.recent_errors.pop_front();
        }
        self.recent_errors.push_back(event);
    }
}

/// Manager in charge the sync process
#[derive(Debug)]
pub struct Syncer {
    /// This is also held by the SyncManager allowing it to track the latest syncmode, without modifying it
    /// No outside process should modify this value, only being modified by the sync cycle
    snap_enabled: Arc<AtomicBool>,
    peers: PeerHandler,
    // Used for cancelling long-living tasks upon shutdown
    cancel_token: CancellationToken,
    blockchain: Arc<Blockchain>,
    /// This string indicates a folder where the snap algorithm will store temporary files that are
    /// used during the syncing process
    datadir: PathBuf,
    diagnostics: Arc<tokio::sync::RwLock<SyncDiagnostics>>,
}

impl Syncer {
    pub fn new(
        peers: PeerHandler,
        snap_enabled: Arc<AtomicBool>,
        cancel_token: CancellationToken,
        blockchain: Arc<Blockchain>,
        datadir: PathBuf,
        diagnostics: Arc<tokio::sync::RwLock<SyncDiagnostics>>,
    ) -> Self {
        Self {
            snap_enabled,
            peers,
            cancel_token,
            blockchain,
            datadir,
            diagnostics,
        }
    }

    /// Starts a sync cycle, updating the state with all blocks between the current head and the sync head
    /// Will perform either full or snap sync depending on the manager's `snap_mode`
    /// In full mode, all blocks will be fetched via p2p eth requests and executed to rebuild the state
    /// In snap mode, blocks and receipts will be fetched and stored in parallel while the state is fetched via p2p snap requests
    /// After the sync cycle is complete, the sync mode will be set to full
    /// If the sync fails, no error will be returned but a warning will be emitted
    /// [WARNING] Sync is done optimistically, so headers and bodies may be stored even if their data has not been fully synced if the sync is aborted halfway
    /// [WARNING] Sync is currenlty simplified and will not download bodies + receipts previous to the pivot during snap sync
    pub async fn start_sync(&mut self, sync_head: H256, store: Store) {
        let start_time = Instant::now();
        match self.sync_cycle(sync_head, store).await {
            Ok(()) => {
                self.diagnostics.write().await.current_phase = "idle".to_string();
                info!(
                    time_elapsed_s = start_time.elapsed().as_secs(),
                    %sync_head,
                    "Sync cycle finished successfully",
                );
            }

            // If the error is irrecoverable, we exit ethrex
            Err(error) => {
                let recoverable = error.is_recoverable();
                self.diagnostics.write().await.current_phase = "idle".to_string();
                debug!(
                    error_type = %error,
                    recoverable = recoverable,
                    action = if recoverable { "retry" } else { "exit" },
                    "Sync cycle error classification"
                );
                self.diagnostics.write().await.push_error(SyncErrorEvent {
                    timestamp: std::time::SystemTime::now()
                        .duration_since(std::time::UNIX_EPOCH)
                        .unwrap_or_default()
                        .as_secs(),
                    error_type: format!("{:?}", std::mem::discriminant(&error)),
                    error_message: error.to_string(),
                    recoverable,
                });
                match recoverable {
                    false => {
                        // We exit the node, as we can't recover this error
                        error!(
                            time_elapsed_s = start_time.elapsed().as_secs(),
                            %sync_head,
                            %error, "Sync cycle failed, exiting as the error is irrecoverable",
                        );
                        std::process::exit(2);
                    }
                    true => {
                        // We do nothing, as the error is recoverable
                        warn!(
                            time_elapsed_s = start_time.elapsed().as_secs(),
                            %sync_head,
                            %error, "Sync cycle failed, retrying",
                        );
                    }
                }
            }
        }
    }

    /// Performs the sync cycle described in `start_sync`, returns an error if the sync fails at any given step and aborts all active processes
    async fn sync_cycle(&mut self, sync_head: H256, store: Store) -> Result<(), SyncError> {
        // Take picture of the current sync mode, we will update the original value when we need to
        if self.snap_enabled.load(Ordering::Relaxed) {
            // Probe the sync head's block number before committing to snap sync.
            // On a fresh devnet the chain head may be only a few blocks deep; the
            // existing in-loop `head_close_to_0` guard in `sync_cycle_snap`
            // (snap_sync.rs, same `< MIN_FULL_BLOCKS` check) is only reached
            // after a successful header batch, which can stall when peers are
            // barely synced themselves. Pre-checking avoids that. The probe
            // response is intentionally discarded; on the snap path the loop
            // re-fetches headers, which keeps `sync_cycle_snap`'s entry simple.
            if let Some(sync_head_number) = probe_sync_head_number(&mut self.peers, sync_head).await
                && sync_head_number < MIN_FULL_BLOCKS
            {
                info!(
                    sync_head_number,
                    "Sync head below MIN_FULL_BLOCKS ({MIN_FULL_BLOCKS}), using full sync"
                );
                self.snap_enabled.store(false, Ordering::Relaxed);
                // Clear any stale snap checkpoint so the manager loop in
                // `sync_manager.rs` doesn't keep re-entering this branch
                // after the full sync completes. Mirrors the cleanup done
                // when the manager auto-switches to full on startup.
                if let Err(e) = store.clear_snap_state().await {
                    warn!("Failed to clear stale snap state: {e}");
                }
                return full::sync_cycle_full(
                    &mut self.peers,
                    self.blockchain.clone(),
                    self.cancel_token.clone(),
                    sync_head,
                    store,
                    &self.diagnostics,
                )
                .await;
            }
            METRICS.enable().await;
            // We validate that we have the folders that are being used empty, as we currently assume
            // they are. If they are not empty we empty the folder
            delete_leaves_folder(&self.datadir);
            let sync_cycle_result = snap_sync::sync_cycle_snap(
                &mut self.peers,
                self.blockchain.clone(),
                &self.snap_enabled,
                sync_head,
                store,
                &self.datadir,
                &self.diagnostics,
            )
            .await;
            METRICS.disable().await;
            sync_cycle_result
        } else {
            full::sync_cycle_full(
                &mut self.peers,
                self.blockchain.clone(),
                self.cancel_token.clone(),
                sync_head,
                store,
                &self.diagnostics,
            )
            .await
        }
    }
}

/// Number of attempts to fetch the sync head's header for the snap-vs-full pre-check.
const PROBE_SYNC_HEAD_ATTEMPTS: u32 = 3;
/// Delay between probe attempts.
const PROBE_SYNC_HEAD_RETRY_DELAY: std::time::Duration = std::time::Duration::from_secs(2);

/// Tries to fetch the block header for `sync_head` and return its number.
///
/// Returns `None` if peers don't respond with the requested header within
/// `PROBE_SYNC_HEAD_ATTEMPTS`. Callers should treat that as "couldn't decide"
/// and fall through to the regular sync path.
///
/// Worst-case latency budget: `PROBE_SYNC_HEAD_ATTEMPTS` × `PEER_REPLY_TIMEOUT`
/// (5s, from `snap/constants.rs`) + (`PROBE_SYNC_HEAD_ATTEMPTS` − 1) ×
/// `PROBE_SYNC_HEAD_RETRY_DELAY` = ~19s on a peer-starved network before we
/// fall through to the snap path. On a healthy network the first attempt
/// usually returns in well under a second.
async fn probe_sync_head_number(peers: &mut PeerHandler, sync_head: H256) -> Option<u64> {
    for attempt in 1..=PROBE_SYNC_HEAD_ATTEMPTS {
        match peers
            .request_block_headers_from_hash(sync_head, BlockRequestOrder::NewToOld)
            .await
        {
            Ok(HeaderFetchOutcome::Headers(headers)) => {
                if let Some(header) = headers.iter().find(|h| h.hash() == sync_head) {
                    return Some(header.number);
                }
                debug!("Sync head probe: response did not contain target header");
            }
            Ok(outcome) => {
                debug!(
                    reason = outcome.failure_reason(),
                    "Sync head probe attempt {attempt}/{PROBE_SYNC_HEAD_ATTEMPTS}: no headers"
                );
            }
            Err(e) => {
                warn!("Sync head probe attempt {attempt}/{PROBE_SYNC_HEAD_ATTEMPTS} failed: {e}");
            }
        }
        if attempt < PROBE_SYNC_HEAD_ATTEMPTS {
            tokio::time::sleep(PROBE_SYNC_HEAD_RETRY_DELAY).await;
        }
    }
    None
}

#[derive(Debug, Default)]
#[allow(clippy::type_complexity)]
/// We store for optimization the accounts that need to heal storage
pub struct AccountStorageRoots {
    /// The accounts that have not been healed are guaranteed to have the original storage root
    /// we can read this storage root
    pub accounts_with_storage_root: BTreeMap<H256, (Option<H256>, Vec<(H256, H256)>)>,
    /// If an account has been healed, it may return to a previous state, so we just store the account
    /// in a hashset
    pub healed_accounts: HashSet<H256>,
}

#[derive(thiserror::Error, Debug)]
pub enum SyncError {
    #[error(transparent)]
    Chain(#[from] ChainError),
    #[error(transparent)]
    Store(#[from] StoreError),
    #[error("{0}")]
    Send(String),
    #[error(transparent)]
    Trie(#[from] TrieError),
    #[error(transparent)]
    Rlp(#[from] RLPDecodeError),
    #[error(transparent)]
    JoinHandle(#[from] tokio::task::JoinError),
    #[error("Missing data from DB")]
    CorruptDB,
    #[error("Failed to fetch latest canonical block, unable to sync")]
    NoLatestCanonical,
    #[error("Range received is invalid")]
    InvalidRangeReceived,
    #[error("Failed to fetch block number for head {0}")]
    BlockNumber(H256),
    #[error("No blocks found")]
    NoBlocks,
    #[error("Failed to read snapshot from {0:?} with error {1:?}")]
    SnapshotReadError(PathBuf, std::io::Error),
    #[error("Failed to RLP decode account_state_snapshot from {0:?}")]
    SnapshotDecodeError(PathBuf),
    #[error("Failed to RLP decode code_hashes_snapshot from {0:?}")]
    CodeHashesSnapshotDecodeError(PathBuf),
    #[error("Failed to get account state for block {0:?} and account hash {1:?}")]
    AccountState(H256, H256),
    #[error("Failed to fetch bytecodes from peers")]
    BytecodesNotFound,
    #[error("Failed to get account state snapshots directory")]
    AccountStateSnapshotsDirNotFound,
    #[error("Failed to get account storages snapshots directory")]
    AccountStoragesSnapshotsDirNotFound,
    #[error("Failed to get code hashes snapshots directory")]
    CodeHashesSnapshotsDirNotFound,
    #[error("Got different state roots for account hash: {0:?}, expected: {1:?}, computed: {2:?}")]
    DifferentStateRoots(H256, H256, H256),
    #[error("Failed to get block headers")]
    NoBlockHeaders,
    #[error("Peer handler error: {0}")]
    PeerHandler(#[from] PeerHandlerError),
    #[error("Parent not found in healing queue. Parent: {0}, path: {1}")]
    HealingQueueInconsistency(String, String),
    #[error("Filesystem error: {0}")]
    FileSystem(String),
    #[error("Sorted Trie Generation Error: {0}")]
    TrieGenerationError(#[from] TrieGenerationError),
    #[error("Failed to get account temp db directory: {0}")]
    AccountTempDBDirNotFound(String),
    #[error("Failed to get storage temp db directory: {0}")]
    StorageTempDBDirNotFound(String),
    #[error("RocksDB Error: {0}")]
    RocksDBError(String),
    #[error("Bytecode file error")]
    BytecodeFileError,
    #[error("Error in Peer Table: {0}")]
    PeerTableError(#[from] ActorError),
    #[error("Missing fullsync batch")]
    MissingFullsyncBatch,
    #[error("Snap error: {0}")]
    Snap(#[from] crate::snap::SnapError),
}

impl SyncError {
    pub fn is_recoverable(&self) -> bool {
        // PeerHandler delegates to its own classification so that transient
        // peer/network errors retry while structural errors (dead actor,
        // local storage full) still exit.
        if let SyncError::PeerHandler(e) = self {
            return e.is_recoverable();
        }
        match self {
            SyncError::SnapshotReadError(_, _)
            | SyncError::SnapshotDecodeError(_)
            | SyncError::CodeHashesSnapshotDecodeError(_)
            | SyncError::AccountState(_, _)
            | SyncError::BytecodesNotFound
            | SyncError::AccountStateSnapshotsDirNotFound
            | SyncError::AccountStoragesSnapshotsDirNotFound
            | SyncError::CodeHashesSnapshotsDirNotFound
            | SyncError::DifferentStateRoots(_, _, _)
            | SyncError::HealingQueueInconsistency(_, _)
            | SyncError::TrieGenerationError(_)
            | SyncError::AccountTempDBDirNotFound(_)
            | SyncError::StorageTempDBDirNotFound(_)
            | SyncError::RocksDBError(_)
            | SyncError::BytecodeFileError
            | SyncError::NoLatestCanonical
            | SyncError::MissingFullsyncBatch
            | SyncError::Snap(_)
            | SyncError::FileSystem(_) => false,
            // A timed-out actor request is transient (mailbox pressure or a
            // slow handler — requests use spawned-concurrency's 5s default
            // timeout); a stopped actor means p2p is shutting down and must
            // stay fatal.
            SyncError::PeerTableError(ActorError::RequestTimeout) => true,
            SyncError::PeerTableError(ActorError::ActorStopped) => false,
            SyncError::Chain(_)
            | SyncError::Store(_)
            | SyncError::Send(_)
            | SyncError::Trie(_)
            | SyncError::Rlp(_)
            | SyncError::JoinHandle(_)
            | SyncError::CorruptDB
            | SyncError::InvalidRangeReceived
            | SyncError::BlockNumber(_)
            | SyncError::NoBlocks
            | SyncError::NoBlockHeaders => true,
            // PeerHandler handled above by delegation
            SyncError::PeerHandler(_) => unreachable!(),
        }
    }
}

impl<T> From<SendError<T>> for SyncError {
    fn from(value: SendError<T>) -> Self {
        Self::Send(value.to_string())
    }
}