Skip to main content

ethrex_p2p/
network.rs

1#[cfg(feature = "l2")]
2use crate::rlpx::l2::l2_connection::P2PBasedContext;
3#[cfg(not(feature = "l2"))]
4#[derive(Clone, Debug)]
5pub struct P2PBasedContext;
6use crate::{
7    discovery::{DiscoveryConfig, DiscoveryServer, DiscoveryServerError},
8    metrics::{CurrentStepValue, METRICS},
9    peer_table::{PeerData, PeerTable, PeerTableServerProtocol as _},
10    rlpx::{
11        connection::server::{PeerConnBroadcastSender, PeerConnection},
12        message::Message,
13        p2p::SUPPORTED_SNAP_CAPABILITIES,
14    },
15    tx_broadcaster::{TxBroadcaster, TxBroadcasterError},
16    types::{NetworkConfig, Node},
17};
18use ethrex_blockchain::Blockchain;
19use ethrex_common::H256;
20use ethrex_storage::Store;
21use secp256k1::SecretKey;
22use spawned_concurrency::tasks::ActorRef;
23use std::{
24    io,
25    net::SocketAddr,
26    sync::{Arc, atomic::Ordering},
27    time::Duration,
28};
29use tokio::net::{TcpListener, TcpSocket, UdpSocket};
30use tokio_util::task::TaskTracker;
31use tracing::{error, info};
32
33pub const MAX_MESSAGES_TO_BROADCAST: usize = 100000;
34
35#[derive(Clone, Debug)]
36pub struct P2PContext {
37    pub tracker: TaskTracker,
38    pub signer: SecretKey,
39    pub table: PeerTable,
40    pub storage: Store,
41    pub blockchain: Arc<Blockchain>,
42    pub(crate) broadcast: PeerConnBroadcastSender,
43    pub local_node: Node,
44    /// Network addressing configuration: bind vs. external addresses.
45    pub network_config: NetworkConfig,
46    pub client_version: String,
47    #[cfg(feature = "l2")]
48    pub based_context: Option<P2PBasedContext>,
49    pub tx_broadcaster: ActorRef<TxBroadcaster>,
50    pub initial_lookup_interval: f64,
51}
52
53impl P2PContext {
54    #[allow(clippy::too_many_arguments)]
55    pub fn new(
56        local_node: Node,
57        network_config: NetworkConfig,
58        tracker: TaskTracker,
59        signer: SecretKey,
60        peer_table: PeerTable,
61        storage: Store,
62        blockchain: Arc<Blockchain>,
63        client_version: String,
64        based_context: Option<P2PBasedContext>,
65        tx_broadcasting_time_interval: u64,
66        lookup_interval: f64,
67    ) -> Result<Self, NetworkError> {
68        let (channel_broadcast_send_end, _) = tokio::sync::broadcast::channel::<(
69            tokio::task::Id,
70            Arc<Message>,
71        )>(MAX_MESSAGES_TO_BROADCAST);
72
73        let tx_broadcaster = TxBroadcaster::spawn(
74            peer_table.clone(),
75            blockchain.clone(),
76            tx_broadcasting_time_interval,
77        )
78        .inspect_err(|e| {
79            error!("Failed to start Tx Broadcaster: {e}");
80        })?;
81
82        #[cfg(not(feature = "l2"))]
83        let _ = &based_context;
84
85        Ok(P2PContext {
86            local_node,
87            network_config,
88            tracker,
89            signer,
90            table: peer_table,
91            storage,
92            blockchain,
93            broadcast: channel_broadcast_send_end,
94            client_version,
95            #[cfg(feature = "l2")]
96            based_context,
97            tx_broadcaster,
98            initial_lookup_interval: lookup_interval,
99        })
100    }
101}
102
103#[derive(Debug, thiserror::Error)]
104pub enum NetworkError {
105    #[error("Failed to start discovery server: {0}")]
106    DiscoveryError(#[from] DiscoveryServerError),
107    #[error("Failed to start Tx Broadcaster: {0}")]
108    TxBroadcasterError(#[from] TxBroadcasterError),
109    #[error("Failed to bind UDP socket: {0}")]
110    UdpSocketError(std::io::Error),
111}
112
113pub async fn start_network(
114    context: P2PContext,
115    bootnodes: Vec<Node>,
116    config: DiscoveryConfig,
117) -> Result<(), NetworkError> {
118    let udp_socket = Arc::new(
119        UdpSocket::bind(context.network_config.bind_udp_addr())
120            .await
121            .map_err(NetworkError::UdpSocketError)?,
122    );
123
124    DiscoveryServer::spawn(
125        context.storage.clone(),
126        context.local_node.clone(),
127        context.signer,
128        udp_socket,
129        context.table.clone(),
130        bootnodes,
131        DiscoveryConfig {
132            initial_lookup_interval: context.initial_lookup_interval,
133            ..config
134        },
135    )
136    .await
137    .inspect_err(|e| {
138        error!("Failed to start discovery server: {e}");
139    })?;
140
141    context.tracker.spawn(serve_p2p_requests(context.clone()));
142
143    Ok(())
144}
145
146pub(crate) async fn serve_p2p_requests(context: P2PContext) {
147    let tcp_addr = context.network_config.bind_tcp_addr();
148    let external_tcp_addr = context.local_node.tcp_addr();
149    let listener = match listener(tcp_addr) {
150        Ok(result) => result,
151        Err(e) => {
152            error!("Error opening tcp socket at {tcp_addr}: {e}. Stopping p2p server");
153            return;
154        }
155    };
156    loop {
157        let (stream, peer_addr) = match listener.accept().await {
158            Ok(result) => result,
159            Err(e) => {
160                error!("Error receiving data from tcp socket {tcp_addr}: {e}. Stopping p2p server");
161                return;
162            }
163        };
164
165        if external_tcp_addr == peer_addr {
166            // Ignore connections from self
167            continue;
168        }
169
170        let _ = PeerConnection::spawn_as_receiver(context.clone(), peer_addr, stream);
171    }
172}
173
174fn listener(tcp_addr: SocketAddr) -> Result<TcpListener, io::Error> {
175    let tcp_socket = match tcp_addr {
176        SocketAddr::V4(_) => TcpSocket::new_v4(),
177        SocketAddr::V6(_) => TcpSocket::new_v6(),
178    }?;
179    tcp_socket.set_reuseport(true).ok();
180    tcp_socket.set_reuseaddr(true).ok();
181    tcp_socket.bind(tcp_addr)?;
182
183    tcp_socket.listen(50)
184}
185
186pub async fn periodically_show_peer_stats(blockchain: Arc<Blockchain>, peer_table: PeerTable) {
187    periodically_show_peer_stats_during_syncing(blockchain, &peer_table).await;
188    periodically_show_peer_stats_after_sync(&peer_table).await;
189}
190
191/// Tracks metric values at phase start and from the previous interval for rate calculations
192#[derive(Default, Clone, Copy)]
193struct PhaseCounters {
194    headers: u64,
195    accounts: u64,
196    accounts_inserted: u64,
197    storage: u64,
198    storage_inserted: u64,
199    healed_accounts: u64,
200    healed_storage: u64,
201    bytecodes: u64,
202}
203
204impl PhaseCounters {
205    fn capture_current() -> Self {
206        Self {
207            headers: METRICS.downloaded_headers.get(),
208            accounts: METRICS.downloaded_account_tries.load(Ordering::Relaxed),
209            accounts_inserted: METRICS.account_tries_inserted.load(Ordering::Relaxed),
210            storage: METRICS.storage_leaves_downloaded.get(),
211            storage_inserted: METRICS.storage_leaves_inserted.get(),
212            healed_accounts: METRICS
213                .global_state_trie_leafs_healed
214                .load(Ordering::Relaxed),
215            healed_storage: METRICS
216                .global_storage_tries_leafs_healed
217                .load(Ordering::Relaxed),
218            bytecodes: METRICS.downloaded_bytecodes.load(Ordering::Relaxed),
219        }
220    }
221}
222
223pub async fn periodically_show_peer_stats_during_syncing(
224    blockchain: Arc<Blockchain>,
225    peer_table: &PeerTable,
226) {
227    let start = std::time::Instant::now();
228    let mut previous_step = CurrentStepValue::None;
229    let mut phase_start_time = std::time::Instant::now();
230    let mut sync_started_logged = false;
231
232    // Track metrics at phase start for phase summaries
233    let mut phase_start = PhaseCounters::default();
234    // Track metrics from previous interval for rate calculations
235    let mut prev_interval = PhaseCounters::default();
236
237    loop {
238        if blockchain.is_synced() {
239            if !sync_started_logged {
240                info!("Node already has state; following chain via full sync");
241                return;
242            }
243            // Log sync complete summary
244            let total_elapsed = format_duration(start.elapsed());
245            let headers_downloaded = METRICS.downloaded_headers.get();
246            let accounts_downloaded = METRICS.downloaded_account_tries.load(Ordering::Relaxed);
247            let storage_downloaded = METRICS.storage_leaves_downloaded.get();
248            let bytecodes_downloaded = METRICS.downloaded_bytecodes.load(Ordering::Relaxed);
249            let healed_accounts = METRICS
250                .global_state_trie_leafs_healed
251                .load(Ordering::Relaxed);
252            let healed_storage = METRICS
253                .global_storage_tries_leafs_healed
254                .load(Ordering::Relaxed);
255
256            info!("");
257            info!(
258                "╭──────────────────────────────────────────────────────────────────────────────╮"
259            );
260            info!(
261                "│ SNAP SYNC COMPLETE                                                           │"
262            );
263            info!(
264                "├──────────────────────────────────────────────────────────────────────────────┤"
265            );
266            info!("│ {:<76}│", format!("Total time: {}", total_elapsed));
267            info!(
268                "├──────────────────────────────────────────────────────────────────────────────┤"
269            );
270            info!(
271                "│ Data summary:                                                                │"
272            );
273            let headers_accounts = format!(
274                "  Headers: {:<14} │  Accounts: {}",
275                format_thousands(headers_downloaded),
276                format_thousands(accounts_downloaded)
277            );
278            info!("│ {:<76}│", headers_accounts);
279            let storage_bytecodes = format!(
280                "  Storage: {:<14} │  Bytecodes: {}",
281                format_thousands(storage_downloaded),
282                format_thousands(bytecodes_downloaded)
283            );
284            info!("│ {:<76}│", storage_bytecodes);
285            let healed = format!(
286                "  Healed: {} state paths + {} storage accounts",
287                format_thousands(healed_accounts),
288                format_thousands(healed_storage)
289            );
290            info!("│ {:<76}│", healed);
291            info!(
292                "╰──────────────────────────────────────────────────────────────────────────────╯"
293            );
294            return;
295        }
296
297        let metrics_enabled = *METRICS.enabled.lock().await;
298        if !metrics_enabled {
299            tokio::time::sleep(Duration::from_secs(1)).await;
300            continue;
301        }
302
303        let current_step = METRICS.current_step.get();
304        let peer_number = peer_table.peer_count().await.unwrap_or(0);
305
306        // Log sync started banner when we have valid sync head data
307        if !sync_started_logged && current_step != CurrentStepValue::None {
308            let sync_head_block = METRICS.sync_head_block.load(Ordering::Relaxed);
309            let sync_head_hash = *METRICS.sync_head_hash.lock().await;
310
311            // Only show banner when sync_head data is populated (not genesis/default)
312            if sync_head_block > 0 && sync_head_hash != H256::zero() {
313                let head_short = format!("{:x}", sync_head_hash);
314                let head_short = &head_short[..8.min(head_short.len())];
315
316                info!("");
317                info!("╭─────────────────────────────────────────────────────────────╮");
318                info!("│ {:<59} │", "SNAP SYNC STARTED");
319                let target_content = format!(
320                    "Target: {}... (block #{})",
321                    head_short,
322                    format_thousands(sync_head_block)
323                );
324                info!("│ {:<59} │", target_content);
325                info!("│ {:<59} │", format!("Peers: {}", peer_number));
326                info!("╰─────────────────────────────────────────────────────────────╯");
327                sync_started_logged = true;
328            }
329        }
330
331        // Only show phase progress after the SNAP SYNC STARTED banner
332        if !sync_started_logged {
333            tokio::time::sleep(Duration::from_secs(1)).await;
334            continue;
335        }
336
337        // Detect phase transition
338        if current_step != previous_step && current_step != CurrentStepValue::None {
339            // Log completion of previous phase (if any)
340            if previous_step != CurrentStepValue::None {
341                // Force a final progress print so the bar doesn't look incomplete
342                let phase_elapsed = phase_start_time.elapsed();
343                let total_elapsed = format_duration(start.elapsed());
344                log_phase_progress(
345                    previous_step,
346                    phase_elapsed,
347                    &total_elapsed,
348                    peer_number,
349                    &prev_interval,
350                )
351                .await;
352
353                let phase_elapsed_str = format_duration(phase_start_time.elapsed());
354                log_phase_completion(
355                    previous_step,
356                    phase_elapsed_str,
357                    &phase_metrics(previous_step, &phase_start).await,
358                );
359
360                // Emit final metrics for completed phase
361                #[cfg(feature = "metrics")]
362                push_sync_prometheus_metrics(previous_step);
363            }
364
365            // Start new phase
366            phase_start_time = std::time::Instant::now();
367
368            // Record phase start timestamp for Grafana elapsed panels
369            #[cfg(feature = "metrics")]
370            {
371                let (_, phase_name) = phase_info(current_step);
372                let now = std::time::SystemTime::now()
373                    .duration_since(std::time::UNIX_EPOCH)
374                    .unwrap_or_default()
375                    .as_secs();
376                ethrex_metrics::sync::METRICS_SYNC
377                    .phase_start_timestamp
378                    .with_label_values(&[phase_name])
379                    .set(now as i64);
380            }
381
382            // Capture metrics at phase start
383            phase_start = PhaseCounters::capture_current();
384            prev_interval = phase_start;
385
386            log_phase_separator(current_step);
387            previous_step = current_step;
388        }
389
390        // Log phase-specific progress update
391        let phase_elapsed = phase_start_time.elapsed();
392        let total_elapsed = format_duration(start.elapsed());
393
394        log_phase_progress(
395            current_step,
396            phase_elapsed,
397            &total_elapsed,
398            peer_number,
399            &prev_interval,
400        )
401        .await;
402
403        // Push progress + peer health to Prometheus
404        #[cfg(feature = "metrics")]
405        {
406            push_sync_prometheus_metrics(current_step);
407            let diag = peer_table.get_peer_diagnostics().await.unwrap_or_default();
408            let snap_peers = diag
409                .iter()
410                .filter(|p| p.capabilities.iter().any(|c| c.starts_with("snap/")))
411                .count();
412            let eligible = diag.iter().filter(|p| p.eligible).count();
413            let inflight: i64 = diag.iter().map(|p| p.inflight_requests).sum();
414            ethrex_metrics::sync::METRICS_SYNC.set_snap_peers(snap_peers as i64);
415            ethrex_metrics::sync::METRICS_SYNC.set_eligible_peers(eligible as i64);
416            ethrex_metrics::sync::METRICS_SYNC.set_inflight_requests(inflight);
417        }
418
419        // Update previous interval counters for next rate calculation
420        prev_interval = PhaseCounters::capture_current();
421
422        tokio::time::sleep(Duration::from_secs(10)).await;
423    }
424}
425
426/// Returns (phase_number, phase_name) for the current step
427fn phase_info(step: CurrentStepValue) -> (u8, &'static str) {
428    match step {
429        CurrentStepValue::DownloadingHeaders => (1, "BLOCK HEADERS"),
430        CurrentStepValue::RequestingAccountRanges => (2, "ACCOUNT RANGES"),
431        CurrentStepValue::InsertingAccountRanges | CurrentStepValue::InsertingAccountRangesNoDb => {
432            (3, "ACCOUNT INSERTION")
433        }
434        CurrentStepValue::RequestingStorageRanges => (4, "STORAGE RANGES"),
435        CurrentStepValue::InsertingStorageRanges => (5, "STORAGE INSERTION"),
436        CurrentStepValue::HealingState => (6, "STATE HEALING"),
437        CurrentStepValue::HealingStorage => (7, "STORAGE HEALING"),
438        CurrentStepValue::RequestingBytecodes => (8, "BYTECODES"),
439        CurrentStepValue::None => (0, "UNKNOWN"),
440    }
441}
442
443fn log_phase_separator(step: CurrentStepValue) {
444    let (phase_num, phase_name) = phase_info(step);
445    let header = format!("── PHASE {}/8: {} ", phase_num, phase_name);
446    let header_width = header.chars().count();
447    let padding_width = 80usize.saturating_sub(header_width);
448    let padding = "─".repeat(padding_width);
449    info!("");
450    info!("{}{}", header, padding);
451}
452
453fn log_phase_completion(step: CurrentStepValue, elapsed: String, summary: &str) {
454    let (_, phase_name) = phase_info(step);
455    info!("✓ {} complete: {} in {}", phase_name, summary, elapsed);
456}
457
458async fn phase_metrics(step: CurrentStepValue, phase_start: &PhaseCounters) -> String {
459    match step {
460        CurrentStepValue::DownloadingHeaders => {
461            let downloaded = METRICS
462                .downloaded_headers
463                .get()
464                .saturating_sub(phase_start.headers);
465            format!("{} headers", format_thousands(downloaded))
466        }
467        CurrentStepValue::RequestingAccountRanges => {
468            let downloaded = METRICS
469                .downloaded_account_tries
470                .load(Ordering::Relaxed)
471                .saturating_sub(phase_start.accounts);
472            format!("{} accounts", format_thousands(downloaded))
473        }
474        CurrentStepValue::InsertingAccountRanges | CurrentStepValue::InsertingAccountRangesNoDb => {
475            let inserted = METRICS
476                .account_tries_inserted
477                .load(Ordering::Relaxed)
478                .saturating_sub(phase_start.accounts_inserted);
479            format!("{} accounts inserted", format_thousands(inserted))
480        }
481        CurrentStepValue::RequestingStorageRanges => {
482            let downloaded = METRICS
483                .storage_leaves_downloaded
484                .get()
485                .saturating_sub(phase_start.storage);
486            format!("{} storage slots", format_thousands(downloaded))
487        }
488        CurrentStepValue::InsertingStorageRanges => {
489            let inserted = METRICS
490                .storage_leaves_inserted
491                .get()
492                .saturating_sub(phase_start.storage_inserted);
493            format!("{} storage slots inserted", format_thousands(inserted))
494        }
495        CurrentStepValue::HealingState => {
496            let healed = METRICS
497                .global_state_trie_leafs_healed
498                .load(Ordering::Relaxed)
499                .saturating_sub(phase_start.healed_accounts);
500            format!("{} state paths healed", format_thousands(healed))
501        }
502        CurrentStepValue::HealingStorage => {
503            let healed = METRICS
504                .global_storage_tries_leafs_healed
505                .load(Ordering::Relaxed)
506                .saturating_sub(phase_start.healed_storage);
507            format!("{} storage accounts healed", format_thousands(healed))
508        }
509        CurrentStepValue::RequestingBytecodes => {
510            let downloaded = METRICS
511                .downloaded_bytecodes
512                .load(Ordering::Relaxed)
513                .saturating_sub(phase_start.bytecodes);
514            format!("{} bytecodes", format_thousands(downloaded))
515        }
516        CurrentStepValue::None => String::new(),
517    }
518}
519
520/// Interval in seconds between progress updates
521const PROGRESS_INTERVAL_SECS: u64 = 30;
522
523async fn log_phase_progress(
524    step: CurrentStepValue,
525    phase_elapsed: Duration,
526    total_elapsed: &str,
527    peer_count: usize,
528    prev_interval: &PhaseCounters,
529) {
530    let phase_elapsed_str = format_duration(phase_elapsed);
531
532    // Use consistent column widths: left column 40 chars, then │, then right column
533    let col1_width = 40;
534
535    match step {
536        CurrentStepValue::DownloadingHeaders => {
537            let headers_to_download = METRICS.sync_head_block.load(Ordering::Relaxed);
538            let headers_downloaded =
539                u64::min(METRICS.downloaded_headers.get(), headers_to_download);
540            let interval_downloaded = headers_downloaded.saturating_sub(prev_interval.headers);
541            let percentage = if headers_to_download == 0 {
542                0.0
543            } else {
544                (headers_downloaded as f64 / headers_to_download as f64) * 100.0
545            };
546            let rate = interval_downloaded / PROGRESS_INTERVAL_SECS;
547
548            let progress = progress_bar(percentage, 40);
549            info!("  {} {:>5.1}%", progress, percentage);
550            info!("");
551            let col1 = format!(
552                "Headers: {} / {}",
553                format_thousands(headers_downloaded),
554                format_thousands(headers_to_download)
555            );
556            info!("  {:<col1_width$} │  Elapsed: {}", col1, phase_elapsed_str);
557            let col1 = format!("Rate: {} headers/s", format_thousands(rate));
558            info!("  {:<col1_width$} │  Peers: {}", col1, peer_count);
559            info!("  Total time: {}", total_elapsed);
560        }
561        CurrentStepValue::RequestingAccountRanges => {
562            let accounts_downloaded = METRICS.downloaded_account_tries.load(Ordering::Relaxed);
563            let interval_downloaded = accounts_downloaded.saturating_sub(prev_interval.accounts);
564            let rate = interval_downloaded / PROGRESS_INTERVAL_SECS;
565
566            info!("");
567            let col1 = format!(
568                "Accounts fetched: {}",
569                format_thousands(accounts_downloaded)
570            );
571            info!("  {:<col1_width$} │  Elapsed: {}", col1, phase_elapsed_str);
572            let col1 = format!("Rate: {} accounts/s", format_thousands(rate));
573            info!("  {:<col1_width$} │  Peers: {}", col1, peer_count);
574            info!("  Total time: {}", total_elapsed);
575        }
576        CurrentStepValue::InsertingAccountRanges | CurrentStepValue::InsertingAccountRangesNoDb => {
577            let accounts_to_insert = METRICS.downloaded_account_tries.load(Ordering::Relaxed);
578            let accounts_inserted = METRICS.account_tries_inserted.load(Ordering::Relaxed);
579            let interval_inserted =
580                accounts_inserted.saturating_sub(prev_interval.accounts_inserted);
581            let percentage = if accounts_to_insert == 0 {
582                0.0
583            } else {
584                (accounts_inserted as f64 / accounts_to_insert as f64) * 100.0
585            };
586            let rate = interval_inserted / PROGRESS_INTERVAL_SECS;
587
588            let progress = progress_bar(percentage, 40);
589            info!("  {} {:>5.1}%", progress, percentage);
590            info!("");
591            let col1 = format!(
592                "Accounts: {} / {}",
593                format_thousands(accounts_inserted),
594                format_thousands(accounts_to_insert)
595            );
596            info!("  {:<col1_width$} │  Elapsed: {}", col1, phase_elapsed_str);
597            let col1 = format!("Rate: {} accounts/s", format_thousands(rate));
598            info!("  {:<col1_width$} │  Peers: {}", col1, peer_count);
599            info!("  Total time: {}", total_elapsed);
600        }
601        CurrentStepValue::RequestingStorageRanges => {
602            let storage_downloaded = METRICS.storage_leaves_downloaded.get();
603            let interval_downloaded = storage_downloaded.saturating_sub(prev_interval.storage);
604            let rate = interval_downloaded / PROGRESS_INTERVAL_SECS;
605
606            info!("");
607            let col1 = format!(
608                "Storage slots fetched: {}",
609                format_thousands(storage_downloaded)
610            );
611            info!("  {:<col1_width$} │  Elapsed: {}", col1, phase_elapsed_str);
612            let col1 = format!("Rate: {} slots/s", format_thousands(rate));
613            info!("  {:<col1_width$} │  Peers: {}", col1, peer_count);
614            info!("  Total time: {}", total_elapsed);
615        }
616        CurrentStepValue::InsertingStorageRanges => {
617            let storage_inserted = METRICS.storage_leaves_inserted.get();
618            let interval_inserted = storage_inserted.saturating_sub(prev_interval.storage_inserted);
619            let rate = interval_inserted / PROGRESS_INTERVAL_SECS;
620
621            info!("");
622            let col1 = format!(
623                "Storage slots inserted: {}",
624                format_thousands(storage_inserted)
625            );
626            info!("  {:<col1_width$} │  Elapsed: {}", col1, phase_elapsed_str);
627            let col1 = format!("Rate: {} slots/s", format_thousands(rate));
628            info!("  {:<col1_width$} │  Peers: {}", col1, peer_count);
629            info!("  Total time: {}", total_elapsed);
630        }
631        CurrentStepValue::HealingState => {
632            let healed = METRICS
633                .global_state_trie_leafs_healed
634                .load(Ordering::Relaxed);
635            let interval_healed = healed.saturating_sub(prev_interval.healed_accounts);
636            let rate = interval_healed / PROGRESS_INTERVAL_SECS;
637
638            info!("");
639            let col1 = format!("State paths healed: {}", format_thousands(healed));
640            info!("  {:<col1_width$} │  Elapsed: {}", col1, phase_elapsed_str);
641            let col1 = format!("Rate: {} paths/s", format_thousands(rate));
642            info!("  {:<col1_width$} │  Peers: {}", col1, peer_count);
643            info!("  Total time: {}", total_elapsed);
644        }
645        CurrentStepValue::HealingStorage => {
646            let healed = METRICS
647                .global_storage_tries_leafs_healed
648                .load(Ordering::Relaxed);
649            let interval_healed = healed.saturating_sub(prev_interval.healed_storage);
650            let rate = interval_healed / PROGRESS_INTERVAL_SECS;
651
652            info!("");
653            let col1 = format!("Storage accounts healed: {}", format_thousands(healed));
654            info!("  {:<col1_width$} │  Elapsed: {}", col1, phase_elapsed_str);
655            let col1 = format!("Rate: {} accounts/s", format_thousands(rate));
656            info!("  {:<col1_width$} │  Peers: {}", col1, peer_count);
657            info!("  Total time: {}", total_elapsed);
658        }
659        CurrentStepValue::RequestingBytecodes => {
660            let bytecodes_to_download = METRICS.bytecodes_to_download.load(Ordering::Relaxed);
661            let bytecodes_downloaded = METRICS.downloaded_bytecodes.load(Ordering::Relaxed);
662            let interval_downloaded = bytecodes_downloaded.saturating_sub(prev_interval.bytecodes);
663            let percentage = if bytecodes_to_download == 0 {
664                0.0
665            } else {
666                (bytecodes_downloaded as f64 / bytecodes_to_download as f64) * 100.0
667            };
668            let rate = interval_downloaded / PROGRESS_INTERVAL_SECS;
669
670            let progress = progress_bar(percentage, 40);
671            info!("  {} {:>5.1}%", progress, percentage);
672            info!("");
673            let col1 = format!(
674                "Bytecodes: {} / {}",
675                format_thousands(bytecodes_downloaded),
676                format_thousands(bytecodes_to_download)
677            );
678            info!("  {:<col1_width$} │  Elapsed: {}", col1, phase_elapsed_str);
679            let col1 = format!("Rate: {} codes/s", format_thousands(rate));
680            info!("  {:<col1_width$} │  Peers: {}", col1, peer_count);
681            info!("  Total time: {}", total_elapsed);
682        }
683        CurrentStepValue::None => {}
684    }
685}
686
687/// Push snap sync progress to Prometheus gauges (from METRICS atomics).
688/// Called each polling cycle. Rates are NOT computed here — use rate() in Grafana.
689#[cfg(feature = "metrics")]
690fn push_sync_prometheus_metrics(step: CurrentStepValue) {
691    use ethrex_metrics::sync::METRICS_SYNC;
692    use std::sync::atomic::Ordering::Relaxed;
693
694    let (phase_num, _) = phase_info(step);
695    METRICS_SYNC.stage.set(phase_num as i64);
696    METRICS_SYNC
697        .pivot_block
698        .set(METRICS.sync_head_block.load(Relaxed) as i64);
699
700    // Push raw pivot timestamp — Grafana computes age via time() - timestamp
701    let pivot_ts = METRICS.pivot_timestamp.load(Relaxed);
702    if pivot_ts > 0 {
703        METRICS_SYNC.pivot_timestamp.set(pivot_ts as i64);
704    }
705    // Also update pivot_age_seconds for RPC/peer_top consumers
706    if pivot_ts > 0 {
707        let now = std::time::SystemTime::now()
708            .duration_since(std::time::UNIX_EPOCH)
709            .unwrap_or_default()
710            .as_secs();
711        METRICS_SYNC
712            .pivot_age_seconds
713            .set(now.saturating_sub(pivot_ts) as i64);
714    }
715
716    match step {
717        CurrentStepValue::DownloadingHeaders => {
718            let total = METRICS.sync_head_block.load(Relaxed);
719            let downloaded = u64::min(METRICS.downloaded_headers.get(), total);
720            METRICS_SYNC.headers_downloaded.set(downloaded as i64);
721            METRICS_SYNC.headers_total.set(total as i64);
722        }
723        CurrentStepValue::RequestingAccountRanges => {
724            let downloaded = METRICS.downloaded_account_tries.load(Relaxed);
725            METRICS_SYNC.accounts_downloaded.set(downloaded as i64);
726        }
727        CurrentStepValue::InsertingAccountRanges | CurrentStepValue::InsertingAccountRangesNoDb => {
728            let total = METRICS.downloaded_account_tries.load(Relaxed);
729            let inserted = METRICS.account_tries_inserted.load(Relaxed);
730            METRICS_SYNC.accounts_downloaded.set(total as i64);
731            METRICS_SYNC.accounts_inserted.set(inserted as i64);
732        }
733        CurrentStepValue::RequestingStorageRanges => {
734            let downloaded = METRICS.storage_leaves_downloaded.get();
735            METRICS_SYNC.storage_downloaded.set(downloaded as i64);
736        }
737        CurrentStepValue::InsertingStorageRanges => {
738            let inserted = METRICS.storage_leaves_inserted.get();
739            METRICS_SYNC.storage_inserted.set(inserted as i64);
740        }
741        CurrentStepValue::HealingState => {
742            let healed = METRICS.global_state_trie_leafs_healed.load(Relaxed);
743            METRICS_SYNC.state_leaves_healed.set(healed as i64);
744        }
745        CurrentStepValue::HealingStorage => {
746            let healed = METRICS.global_storage_tries_leafs_healed.load(Relaxed);
747            METRICS_SYNC.storage_leaves_healed.set(healed as i64);
748        }
749        CurrentStepValue::RequestingBytecodes => {
750            let total = METRICS.bytecodes_to_download.load(Relaxed);
751            let downloaded = METRICS.downloaded_bytecodes.load(Relaxed);
752            METRICS_SYNC.bytecodes_downloaded.set(downloaded as i64);
753            METRICS_SYNC.bytecodes_total.set(total as i64);
754        }
755        CurrentStepValue::None => {}
756    }
757}
758
759fn progress_bar(percentage: f64, width: usize) -> String {
760    let clamped_percentage = percentage.clamp(0.0, 100.0);
761    let filled = ((clamped_percentage / 100.0) * width as f64) as usize;
762    let filled = filled.min(width);
763    let empty = width.saturating_sub(filled);
764    format!("{}{}", "▓".repeat(filled), "░".repeat(empty))
765}
766
767fn format_thousands(n: u64) -> String {
768    let s = n.to_string();
769    let mut result = String::new();
770    for (i, c) in s.chars().rev().enumerate() {
771        if i > 0 && i % 3 == 0 {
772            result.push(',');
773        }
774        result.push(c);
775    }
776    result.chars().rev().collect()
777}
778
779/// Shows the amount of connected peers, active peers, and peers suitable for snap sync on a set interval
780pub async fn periodically_show_peer_stats_after_sync(peer_table: &PeerTable) {
781    const INTERVAL_DURATION: tokio::time::Duration = tokio::time::Duration::from_secs(60);
782    let mut interval = tokio::time::interval(INTERVAL_DURATION);
783    loop {
784        // clone peers to keep the lock short
785        let peers: Vec<PeerData> = peer_table.get_peers_data().await.unwrap_or(Vec::new());
786        let active_peers = peers
787            .iter()
788            .filter(|peer| -> bool { peer.connection.as_ref().is_some() })
789            .count();
790        let snap_active_peers = peers
791            .iter()
792            .filter(|peer| -> bool {
793                peer.connection.as_ref().is_some()
794                    && SUPPORTED_SNAP_CAPABILITIES
795                        .iter()
796                        .any(|cap| peer.supported_capabilities.contains(cap))
797            })
798            .count();
799        info!("Peers: {active_peers} (snap-capable: {snap_active_peers})");
800        interval.tick().await;
801    }
802}
803
804fn format_duration(duration: Duration) -> String {
805    let total_seconds = duration.as_secs();
806    let hours = total_seconds / 3600;
807    let minutes = (total_seconds % 3600) / 60;
808    let seconds = total_seconds % 60;
809    format!("{hours:02}:{minutes:02}:{seconds:02}")
810}