1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
use std::{
path::PathBuf,
sync::{
Arc,
atomic::{AtomicBool, Ordering},
},
};
use ethrex_blockchain::Blockchain;
use ethrex_common::H256;
use ethrex_storage::Store;
use tokio::{
sync::Mutex,
time::{Duration, sleep},
};
use tokio_util::sync::CancellationToken;
use tracing::{debug, error, info, warn};
use crate::{
peer_handler::PeerHandler,
sync::{SyncDiagnostics, SyncMode, Syncer},
};
/// Abstraction to interact with the active sync process without disturbing it
#[derive(Debug)]
pub struct SyncManager {
/// This is also held by the Syncer and allows tracking it's latest syncmode
/// It is a READ_ONLY value, as modifications will disrupt the current active sync progress
snap_enabled: Arc<AtomicBool>,
syncer: Arc<Mutex<Syncer>>,
last_fcu_head: Arc<Mutex<H256>>,
store: Store,
diagnostics: Arc<tokio::sync::RwLock<SyncDiagnostics>>,
}
impl SyncManager {
pub async fn new(
peer_handler: PeerHandler,
sync_mode: &SyncMode,
cancel_token: CancellationToken,
blockchain: Arc<Blockchain>,
store: Store,
datadir: PathBuf,
) -> Self {
let snap_enabled = Arc::new(AtomicBool::new(matches!(sync_mode, SyncMode::Snap)));
// Fetch checkpoint once to avoid duplicate DB reads
let has_checkpoint = store
.get_header_download_checkpoint()
.await
.unwrap_or_else(|e| {
warn!("Failed to read header download checkpoint: {e}");
None
})
.is_some();
// Auto-switch from snap to full sync if node already has synced state.
// For post-merge networks (terminal_total_difficulty_passed), any stored
// block > 0 means the node has previously synced. For pre-merge networks,
// use merge_netsplit_block as threshold to avoid false positives in hive tests.
if snap_enabled.load(Ordering::Relaxed) {
let latest_block = store.get_latest_block_number().await.unwrap_or(0);
let chain_config = store.get_chain_config();
let is_synced = if chain_config.terminal_total_difficulty_passed {
latest_block > 0
} else if let Some(merge_block) = chain_config.merge_netsplit_block {
latest_block > merge_block
} else {
false
};
if is_synced {
info!("Node has synced state (block {latest_block}), switching to full sync");
snap_enabled.store(false, Ordering::Relaxed);
if has_checkpoint && let Err(e) = store.clear_snap_state().await {
warn!("Failed to clear stale snap state: {e}");
}
}
}
let diagnostics = Arc::new(tokio::sync::RwLock::new(SyncDiagnostics::default()));
let syncer = Arc::new(Mutex::new(Syncer::new(
peer_handler,
snap_enabled.clone(),
cancel_token,
blockchain,
datadir,
diagnostics.clone(),
)));
let sync_manager = Self {
snap_enabled,
syncer,
last_fcu_head: Arc::new(Mutex::new(H256::zero())),
store: store.clone(),
diagnostics,
};
// If the node was in the middle of a sync and then re-started we must resume syncing
// Otherwise we will incorreclty assume the node is already synced and work on invalid state
// Skip if the auto-switch already transitioned to full sync (snap_enabled is now false)
if has_checkpoint && sync_manager.snap_enabled.load(Ordering::Relaxed) {
sync_manager.start_sync();
}
sync_manager
}
/// Sets the latest fcu head and starts the next sync cycle if the syncer is currently inactive
pub fn sync_to_head(&self, fcu_head: H256) {
self.set_head(fcu_head);
if !self.is_active() {
self.start_sync();
}
}
/// Returns the syncer's current syncmode (either snap or full)
pub fn sync_mode(&self) -> SyncMode {
if self.snap_enabled.load(Ordering::Relaxed) {
SyncMode::Snap
} else {
SyncMode::Full
}
}
/// Disables snapsync mode
pub fn disable_snap(&self) {
self.snap_enabled.store(false, Ordering::Relaxed);
}
/// Returns a snapshot of the current sync diagnostics with live values.
pub async fn get_sync_diagnostics(&self) -> SyncDiagnostics {
use crate::metrics::METRICS;
use std::sync::atomic::Ordering::Relaxed;
let mut diag = self.diagnostics.read().await.clone();
// Compute live pivot age
if let Some(ts) = diag.pivot_timestamp {
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_secs();
diag.pivot_age_seconds = Some(now.saturating_sub(ts));
}
// Populate live progress from METRICS atomics
let headers = METRICS.downloaded_headers.get();
let accounts_downloaded = METRICS.downloaded_account_tries.load(Relaxed);
let accounts_inserted = METRICS.account_tries_inserted.load(Relaxed);
let storage_downloaded = METRICS.storage_leaves_downloaded.get();
let storage_inserted = METRICS.storage_leaves_inserted.get();
if headers > 0 {
diag.phase_progress
.insert("headers_downloaded".into(), headers);
}
if accounts_downloaded > 0 {
diag.phase_progress
.insert("accounts_downloaded".into(), accounts_downloaded);
}
if accounts_inserted > 0 {
diag.phase_progress
.insert("accounts_inserted".into(), accounts_inserted);
}
if storage_downloaded > 0 {
diag.phase_progress
.insert("storage_slots_downloaded".into(), storage_downloaded);
}
if storage_inserted > 0 {
diag.phase_progress
.insert("storage_slots_inserted".into(), storage_inserted);
}
diag
}
/// Returns a reference to the diagnostics RwLock for updating from the sync code.
pub fn diagnostics(&self) -> &Arc<tokio::sync::RwLock<SyncDiagnostics>> {
&self.diagnostics
}
/// Updates the last fcu head. This may be used on the next sync cycle if needed
fn set_head(&self, fcu_head: H256) {
if let Ok(mut latest_fcu_head) = self.last_fcu_head.try_lock() {
*latest_fcu_head = fcu_head;
} else {
warn!("Failed to update latest fcu head for syncing")
}
}
/// Returns true is the syncer is active
fn is_active(&self) -> bool {
self.syncer.try_lock().is_err()
}
/// Attempts to sync to the last received fcu head
/// Will do nothing if the syncer is already involved in a sync process
/// If the sync process would require multiple sync cycles (such as snap sync), starts all required sync cycles until the sync is complete
fn start_sync(&self) {
let syncer = self.syncer.clone();
let store = self.store.clone();
let sync_head = self.last_fcu_head.clone();
tokio::spawn(async move {
// If we can't get hold of the syncer, then it means that there is an active sync in process
let Ok(mut syncer) = syncer.try_lock() else {
return;
};
let mut waiting_for_fcu_logged = false;
loop {
let sync_head = {
// Read latest fcu head without holding the lock for longer than needed
let Ok(sync_head) = sync_head.try_lock() else {
error!("Failed to read latest fcu head, unable to sync");
return;
};
*sync_head
};
// Edge case: If we are resuming a sync process after a node restart, wait until the next fcu to start
if sync_head.is_zero() {
if waiting_for_fcu_logged {
debug!(
"Still waiting for a forkchoice update from the consensus client to resume sync"
);
} else {
info!(
"Resuming sync after node restart, waiting for a forkchoice update from the consensus client"
);
waiting_for_fcu_logged = true;
}
sleep(Duration::from_secs(5)).await;
continue;
}
// Start the sync cycle
syncer.start_sync(sync_head, store.clone()).await;
// Continue to the next sync cycle if we have an ongoing snap sync (aka if we still have snap sync checkpoints stored)
if store
.get_header_download_checkpoint()
.await
.ok()
.flatten()
.is_none()
{
break;
}
}
});
}
pub fn get_last_fcu_head(&self) -> Result<H256, tokio::sync::TryLockError> {
Ok(*self.last_fcu_head.try_lock()?)
}
}