ethrex_p2p/
sync_manager.rs1use std::{
2 path::PathBuf,
3 sync::{
4 Arc,
5 atomic::{AtomicBool, Ordering},
6 },
7};
8
9use ethrex_blockchain::Blockchain;
10use ethrex_common::H256;
11use ethrex_storage::Store;
12use tokio::{
13 sync::Mutex,
14 time::{Duration, sleep},
15};
16use tokio_util::sync::CancellationToken;
17use tracing::{debug, error, info, warn};
18
19use crate::{
20 peer_handler::PeerHandler,
21 sync::{SyncDiagnostics, SyncMode, Syncer},
22};
23
24#[derive(Debug)]
26pub struct SyncManager {
27 snap_enabled: Arc<AtomicBool>,
30 syncer: Arc<Mutex<Syncer>>,
31 last_fcu_head: Arc<Mutex<H256>>,
32 store: Store,
33 diagnostics: Arc<tokio::sync::RwLock<SyncDiagnostics>>,
34}
35
36impl SyncManager {
37 pub async fn new(
38 peer_handler: PeerHandler,
39 sync_mode: &SyncMode,
40 cancel_token: CancellationToken,
41 blockchain: Arc<Blockchain>,
42 store: Store,
43 datadir: PathBuf,
44 ) -> Self {
45 let snap_enabled = Arc::new(AtomicBool::new(matches!(sync_mode, SyncMode::Snap)));
46
47 let has_checkpoint = store
49 .get_header_download_checkpoint()
50 .await
51 .unwrap_or_else(|e| {
52 warn!("Failed to read header download checkpoint: {e}");
53 None
54 })
55 .is_some();
56
57 if snap_enabled.load(Ordering::Relaxed) {
62 let latest_block = store.get_latest_block_number().await.unwrap_or(0);
63 let chain_config = store.get_chain_config();
64 let is_synced = if chain_config.terminal_total_difficulty_passed {
65 latest_block > 0
66 } else if let Some(merge_block) = chain_config.merge_netsplit_block {
67 latest_block > merge_block
68 } else {
69 false
70 };
71 if is_synced {
72 info!("Node has synced state (block {latest_block}), switching to full sync");
73 snap_enabled.store(false, Ordering::Relaxed);
74 if has_checkpoint && let Err(e) = store.clear_snap_state().await {
75 warn!("Failed to clear stale snap state: {e}");
76 }
77 }
78 }
79
80 let diagnostics = Arc::new(tokio::sync::RwLock::new(SyncDiagnostics::default()));
81 let syncer = Arc::new(Mutex::new(Syncer::new(
82 peer_handler,
83 snap_enabled.clone(),
84 cancel_token,
85 blockchain,
86 datadir,
87 diagnostics.clone(),
88 )));
89 let sync_manager = Self {
90 snap_enabled,
91 syncer,
92 last_fcu_head: Arc::new(Mutex::new(H256::zero())),
93 store: store.clone(),
94 diagnostics,
95 };
96 if has_checkpoint && sync_manager.snap_enabled.load(Ordering::Relaxed) {
100 sync_manager.start_sync();
101 }
102 sync_manager
103 }
104
105 pub fn sync_to_head(&self, fcu_head: H256) {
107 self.set_head(fcu_head);
108 if !self.is_active() {
109 self.start_sync();
110 }
111 }
112
113 pub fn sync_mode(&self) -> SyncMode {
115 if self.snap_enabled.load(Ordering::Relaxed) {
116 SyncMode::Snap
117 } else {
118 SyncMode::Full
119 }
120 }
121
122 pub fn disable_snap(&self) {
124 self.snap_enabled.store(false, Ordering::Relaxed);
125 }
126
127 pub async fn get_sync_diagnostics(&self) -> SyncDiagnostics {
129 use crate::metrics::METRICS;
130 use std::sync::atomic::Ordering::Relaxed;
131
132 let mut diag = self.diagnostics.read().await.clone();
133
134 if let Some(ts) = diag.pivot_timestamp {
136 let now = std::time::SystemTime::now()
137 .duration_since(std::time::UNIX_EPOCH)
138 .unwrap_or_default()
139 .as_secs();
140 diag.pivot_age_seconds = Some(now.saturating_sub(ts));
141 }
142
143 let headers = METRICS.downloaded_headers.get();
145 let accounts_downloaded = METRICS.downloaded_account_tries.load(Relaxed);
146 let accounts_inserted = METRICS.account_tries_inserted.load(Relaxed);
147 let storage_downloaded = METRICS.storage_leaves_downloaded.get();
148 let storage_inserted = METRICS.storage_leaves_inserted.get();
149
150 if headers > 0 {
151 diag.phase_progress
152 .insert("headers_downloaded".into(), headers);
153 }
154 if accounts_downloaded > 0 {
155 diag.phase_progress
156 .insert("accounts_downloaded".into(), accounts_downloaded);
157 }
158 if accounts_inserted > 0 {
159 diag.phase_progress
160 .insert("accounts_inserted".into(), accounts_inserted);
161 }
162 if storage_downloaded > 0 {
163 diag.phase_progress
164 .insert("storage_slots_downloaded".into(), storage_downloaded);
165 }
166 if storage_inserted > 0 {
167 diag.phase_progress
168 .insert("storage_slots_inserted".into(), storage_inserted);
169 }
170
171 diag
172 }
173
174 pub fn diagnostics(&self) -> &Arc<tokio::sync::RwLock<SyncDiagnostics>> {
176 &self.diagnostics
177 }
178
179 fn set_head(&self, fcu_head: H256) {
181 if let Ok(mut latest_fcu_head) = self.last_fcu_head.try_lock() {
182 *latest_fcu_head = fcu_head;
183 } else {
184 warn!("Failed to update latest fcu head for syncing")
185 }
186 }
187
188 fn is_active(&self) -> bool {
190 self.syncer.try_lock().is_err()
191 }
192
193 fn start_sync(&self) {
197 let syncer = self.syncer.clone();
198 let store = self.store.clone();
199 let sync_head = self.last_fcu_head.clone();
200
201 tokio::spawn(async move {
202 let Ok(mut syncer) = syncer.try_lock() else {
204 return;
205 };
206 let mut waiting_for_fcu_logged = false;
207 loop {
208 let sync_head = {
209 let Ok(sync_head) = sync_head.try_lock() else {
211 error!("Failed to read latest fcu head, unable to sync");
212 return;
213 };
214 *sync_head
215 };
216 if sync_head.is_zero() {
218 if waiting_for_fcu_logged {
219 debug!(
220 "Still waiting for a forkchoice update from the consensus client to resume sync"
221 );
222 } else {
223 info!(
224 "Resuming sync after node restart, waiting for a forkchoice update from the consensus client"
225 );
226 waiting_for_fcu_logged = true;
227 }
228 sleep(Duration::from_secs(5)).await;
229 continue;
230 }
231 syncer.start_sync(sync_head, store.clone()).await;
233 if store
235 .get_header_download_checkpoint()
236 .await
237 .ok()
238 .flatten()
239 .is_none()
240 {
241 break;
242 }
243 }
244 });
245 }
246
247 pub fn get_last_fcu_head(&self) -> Result<H256, tokio::sync::TryLockError> {
248 Ok(*self.last_fcu_head.try_lock()?)
249 }
250}