Skip to main content

ethrex_p2p/
sync_manager.rs

1use std::{
2    path::PathBuf,
3    sync::{
4        Arc,
5        atomic::{AtomicBool, Ordering},
6    },
7};
8
9use ethrex_blockchain::Blockchain;
10use ethrex_common::H256;
11use ethrex_storage::Store;
12use tokio::{
13    sync::Mutex,
14    time::{Duration, sleep},
15};
16use tokio_util::sync::CancellationToken;
17use tracing::{debug, error, info, warn};
18
19use crate::{
20    peer_handler::PeerHandler,
21    sync::{SyncDiagnostics, SyncMode, Syncer},
22};
23
24/// Abstraction to interact with the active sync process without disturbing it
25#[derive(Debug)]
26pub struct SyncManager {
27    /// This is also held by the Syncer and allows tracking it's latest syncmode
28    /// It is a READ_ONLY value, as modifications will disrupt the current active sync progress
29    snap_enabled: Arc<AtomicBool>,
30    syncer: Arc<Mutex<Syncer>>,
31    last_fcu_head: Arc<Mutex<H256>>,
32    store: Store,
33    diagnostics: Arc<tokio::sync::RwLock<SyncDiagnostics>>,
34}
35
36impl SyncManager {
37    pub async fn new(
38        peer_handler: PeerHandler,
39        sync_mode: &SyncMode,
40        cancel_token: CancellationToken,
41        blockchain: Arc<Blockchain>,
42        store: Store,
43        datadir: PathBuf,
44    ) -> Self {
45        let snap_enabled = Arc::new(AtomicBool::new(matches!(sync_mode, SyncMode::Snap)));
46
47        // Fetch checkpoint once to avoid duplicate DB reads
48        let has_checkpoint = store
49            .get_header_download_checkpoint()
50            .await
51            .unwrap_or_else(|e| {
52                warn!("Failed to read header download checkpoint: {e}");
53                None
54            })
55            .is_some();
56
57        // Auto-switch from snap to full sync if node already has synced state.
58        // For post-merge networks (terminal_total_difficulty_passed), any stored
59        // block > 0 means the node has previously synced. For pre-merge networks,
60        // use merge_netsplit_block as threshold to avoid false positives in hive tests.
61        if snap_enabled.load(Ordering::Relaxed) {
62            let latest_block = store.get_latest_block_number().await.unwrap_or(0);
63            let chain_config = store.get_chain_config();
64            let is_synced = if chain_config.terminal_total_difficulty_passed {
65                latest_block > 0
66            } else if let Some(merge_block) = chain_config.merge_netsplit_block {
67                latest_block > merge_block
68            } else {
69                false
70            };
71            if is_synced {
72                info!("Node has synced state (block {latest_block}), switching to full sync");
73                snap_enabled.store(false, Ordering::Relaxed);
74                if has_checkpoint && let Err(e) = store.clear_snap_state().await {
75                    warn!("Failed to clear stale snap state: {e}");
76                }
77            }
78        }
79
80        let diagnostics = Arc::new(tokio::sync::RwLock::new(SyncDiagnostics::default()));
81        let syncer = Arc::new(Mutex::new(Syncer::new(
82            peer_handler,
83            snap_enabled.clone(),
84            cancel_token,
85            blockchain,
86            datadir,
87            diagnostics.clone(),
88        )));
89        let sync_manager = Self {
90            snap_enabled,
91            syncer,
92            last_fcu_head: Arc::new(Mutex::new(H256::zero())),
93            store: store.clone(),
94            diagnostics,
95        };
96        // If the node was in the middle of a sync and then re-started we must resume syncing
97        // Otherwise we will incorreclty assume the node is already synced and work on invalid state
98        // Skip if the auto-switch already transitioned to full sync (snap_enabled is now false)
99        if has_checkpoint && sync_manager.snap_enabled.load(Ordering::Relaxed) {
100            sync_manager.start_sync();
101        }
102        sync_manager
103    }
104
105    /// Sets the latest fcu head and starts the next sync cycle if the syncer is currently inactive
106    pub fn sync_to_head(&self, fcu_head: H256) {
107        self.set_head(fcu_head);
108        if !self.is_active() {
109            self.start_sync();
110        }
111    }
112
113    /// Returns the syncer's current syncmode (either snap or full)
114    pub fn sync_mode(&self) -> SyncMode {
115        if self.snap_enabled.load(Ordering::Relaxed) {
116            SyncMode::Snap
117        } else {
118            SyncMode::Full
119        }
120    }
121
122    /// Disables snapsync mode
123    pub fn disable_snap(&self) {
124        self.snap_enabled.store(false, Ordering::Relaxed);
125    }
126
127    /// Returns a snapshot of the current sync diagnostics with live values.
128    pub async fn get_sync_diagnostics(&self) -> SyncDiagnostics {
129        use crate::metrics::METRICS;
130        use std::sync::atomic::Ordering::Relaxed;
131
132        let mut diag = self.diagnostics.read().await.clone();
133
134        // Compute live pivot age
135        if let Some(ts) = diag.pivot_timestamp {
136            let now = std::time::SystemTime::now()
137                .duration_since(std::time::UNIX_EPOCH)
138                .unwrap_or_default()
139                .as_secs();
140            diag.pivot_age_seconds = Some(now.saturating_sub(ts));
141        }
142
143        // Populate live progress from METRICS atomics
144        let headers = METRICS.downloaded_headers.get();
145        let accounts_downloaded = METRICS.downloaded_account_tries.load(Relaxed);
146        let accounts_inserted = METRICS.account_tries_inserted.load(Relaxed);
147        let storage_downloaded = METRICS.storage_leaves_downloaded.get();
148        let storage_inserted = METRICS.storage_leaves_inserted.get();
149
150        if headers > 0 {
151            diag.phase_progress
152                .insert("headers_downloaded".into(), headers);
153        }
154        if accounts_downloaded > 0 {
155            diag.phase_progress
156                .insert("accounts_downloaded".into(), accounts_downloaded);
157        }
158        if accounts_inserted > 0 {
159            diag.phase_progress
160                .insert("accounts_inserted".into(), accounts_inserted);
161        }
162        if storage_downloaded > 0 {
163            diag.phase_progress
164                .insert("storage_slots_downloaded".into(), storage_downloaded);
165        }
166        if storage_inserted > 0 {
167            diag.phase_progress
168                .insert("storage_slots_inserted".into(), storage_inserted);
169        }
170
171        diag
172    }
173
174    /// Returns a reference to the diagnostics RwLock for updating from the sync code.
175    pub fn diagnostics(&self) -> &Arc<tokio::sync::RwLock<SyncDiagnostics>> {
176        &self.diagnostics
177    }
178
179    /// Updates the last fcu head. This may be used on the next sync cycle if needed
180    fn set_head(&self, fcu_head: H256) {
181        if let Ok(mut latest_fcu_head) = self.last_fcu_head.try_lock() {
182            *latest_fcu_head = fcu_head;
183        } else {
184            warn!("Failed to update latest fcu head for syncing")
185        }
186    }
187
188    /// Returns true is the syncer is active
189    fn is_active(&self) -> bool {
190        self.syncer.try_lock().is_err()
191    }
192
193    /// Attempts to sync to the last received fcu head
194    /// Will do nothing if the syncer is already involved in a sync process
195    /// If the sync process would require multiple sync cycles (such as snap sync), starts all required sync cycles until the sync is complete
196    fn start_sync(&self) {
197        let syncer = self.syncer.clone();
198        let store = self.store.clone();
199        let sync_head = self.last_fcu_head.clone();
200
201        tokio::spawn(async move {
202            // If we can't get hold of the syncer, then it means that there is an active sync in process
203            let Ok(mut syncer) = syncer.try_lock() else {
204                return;
205            };
206            let mut waiting_for_fcu_logged = false;
207            loop {
208                let sync_head = {
209                    // Read latest fcu head without holding the lock for longer than needed
210                    let Ok(sync_head) = sync_head.try_lock() else {
211                        error!("Failed to read latest fcu head, unable to sync");
212                        return;
213                    };
214                    *sync_head
215                };
216                // Edge case: If we are resuming a sync process after a node restart, wait until the next fcu to start
217                if sync_head.is_zero() {
218                    if waiting_for_fcu_logged {
219                        debug!(
220                            "Still waiting for a forkchoice update from the consensus client to resume sync"
221                        );
222                    } else {
223                        info!(
224                            "Resuming sync after node restart, waiting for a forkchoice update from the consensus client"
225                        );
226                        waiting_for_fcu_logged = true;
227                    }
228                    sleep(Duration::from_secs(5)).await;
229                    continue;
230                }
231                // Start the sync cycle
232                syncer.start_sync(sync_head, store.clone()).await;
233                // Continue to the next sync cycle if we have an ongoing snap sync (aka if we still have snap sync checkpoints stored)
234                if store
235                    .get_header_download_checkpoint()
236                    .await
237                    .ok()
238                    .flatten()
239                    .is_none()
240                {
241                    break;
242                }
243            }
244        });
245    }
246
247    pub fn get_last_fcu_head(&self) -> Result<H256, tokio::sync::TryLockError> {
248        Ok(*self.last_fcu_head.try_lock()?)
249    }
250}