1#[cfg(feature = "l2")]
2use crate::rlpx::l2::l2_connection::P2PBasedContext;
3#[cfg(not(feature = "l2"))]
4#[derive(Clone, Debug)]
5pub struct P2PBasedContext;
6use crate::{
7 discovery::{DiscoveryConfig, DiscoveryServer, DiscoveryServerError},
8 metrics::{CurrentStepValue, METRICS},
9 peer_table::{PeerData, PeerTable, PeerTableServerProtocol as _},
10 rlpx::{
11 connection::server::{PeerConnBroadcastSender, PeerConnection},
12 message::Message,
13 p2p::SUPPORTED_SNAP_CAPABILITIES,
14 },
15 tx_broadcaster::{TxBroadcaster, TxBroadcasterError},
16 types::{NetworkConfig, Node},
17};
18use ethrex_blockchain::Blockchain;
19use ethrex_common::H256;
20use ethrex_storage::Store;
21use secp256k1::SecretKey;
22use spawned_concurrency::tasks::ActorRef;
23use std::{
24 io,
25 net::SocketAddr,
26 sync::{Arc, atomic::Ordering},
27 time::Duration,
28};
29use tokio::net::{TcpListener, TcpSocket, UdpSocket};
30use tokio_util::task::TaskTracker;
31use tracing::{error, info};
32
33pub const MAX_MESSAGES_TO_BROADCAST: usize = 100000;
34
35#[derive(Clone, Debug)]
36pub struct P2PContext {
37 pub tracker: TaskTracker,
38 pub signer: SecretKey,
39 pub table: PeerTable,
40 pub storage: Store,
41 pub blockchain: Arc<Blockchain>,
42 pub(crate) broadcast: PeerConnBroadcastSender,
43 pub local_node: Node,
44 pub network_config: NetworkConfig,
46 pub client_version: String,
47 #[cfg(feature = "l2")]
48 pub based_context: Option<P2PBasedContext>,
49 pub tx_broadcaster: ActorRef<TxBroadcaster>,
50 pub initial_lookup_interval: f64,
51}
52
53impl P2PContext {
54 #[allow(clippy::too_many_arguments)]
55 pub fn new(
56 local_node: Node,
57 network_config: NetworkConfig,
58 tracker: TaskTracker,
59 signer: SecretKey,
60 peer_table: PeerTable,
61 storage: Store,
62 blockchain: Arc<Blockchain>,
63 client_version: String,
64 based_context: Option<P2PBasedContext>,
65 tx_broadcasting_time_interval: u64,
66 lookup_interval: f64,
67 ) -> Result<Self, NetworkError> {
68 let (channel_broadcast_send_end, _) = tokio::sync::broadcast::channel::<(
69 tokio::task::Id,
70 Arc<Message>,
71 )>(MAX_MESSAGES_TO_BROADCAST);
72
73 let tx_broadcaster = TxBroadcaster::spawn(
74 peer_table.clone(),
75 blockchain.clone(),
76 tx_broadcasting_time_interval,
77 )
78 .inspect_err(|e| {
79 error!("Failed to start Tx Broadcaster: {e}");
80 })?;
81
82 #[cfg(not(feature = "l2"))]
83 let _ = &based_context;
84
85 Ok(P2PContext {
86 local_node,
87 network_config,
88 tracker,
89 signer,
90 table: peer_table,
91 storage,
92 blockchain,
93 broadcast: channel_broadcast_send_end,
94 client_version,
95 #[cfg(feature = "l2")]
96 based_context,
97 tx_broadcaster,
98 initial_lookup_interval: lookup_interval,
99 })
100 }
101}
102
103#[derive(Debug, thiserror::Error)]
104pub enum NetworkError {
105 #[error("Failed to start discovery server: {0}")]
106 DiscoveryError(#[from] DiscoveryServerError),
107 #[error("Failed to start Tx Broadcaster: {0}")]
108 TxBroadcasterError(#[from] TxBroadcasterError),
109 #[error("Failed to bind UDP socket: {0}")]
110 UdpSocketError(std::io::Error),
111}
112
113pub async fn start_network(
114 context: P2PContext,
115 bootnodes: Vec<Node>,
116 config: DiscoveryConfig,
117) -> Result<(), NetworkError> {
118 let udp_socket = Arc::new(
119 UdpSocket::bind(context.network_config.bind_udp_addr())
120 .await
121 .map_err(NetworkError::UdpSocketError)?,
122 );
123
124 DiscoveryServer::spawn(
125 context.storage.clone(),
126 context.local_node.clone(),
127 context.signer,
128 udp_socket,
129 context.table.clone(),
130 bootnodes,
131 DiscoveryConfig {
132 initial_lookup_interval: context.initial_lookup_interval,
133 ..config
134 },
135 )
136 .await
137 .inspect_err(|e| {
138 error!("Failed to start discovery server: {e}");
139 })?;
140
141 context.tracker.spawn(serve_p2p_requests(context.clone()));
142
143 Ok(())
144}
145
146pub(crate) async fn serve_p2p_requests(context: P2PContext) {
147 let tcp_addr = context.network_config.bind_tcp_addr();
148 let external_tcp_addr = context.local_node.tcp_addr();
149 let listener = match listener(tcp_addr) {
150 Ok(result) => result,
151 Err(e) => {
152 error!("Error opening tcp socket at {tcp_addr}: {e}. Stopping p2p server");
153 return;
154 }
155 };
156 loop {
157 let (stream, peer_addr) = match listener.accept().await {
158 Ok(result) => result,
159 Err(e) => {
160 error!("Error receiving data from tcp socket {tcp_addr}: {e}. Stopping p2p server");
161 return;
162 }
163 };
164
165 if external_tcp_addr == peer_addr {
166 continue;
168 }
169
170 let _ = PeerConnection::spawn_as_receiver(context.clone(), peer_addr, stream);
171 }
172}
173
174fn listener(tcp_addr: SocketAddr) -> Result<TcpListener, io::Error> {
175 let tcp_socket = match tcp_addr {
176 SocketAddr::V4(_) => TcpSocket::new_v4(),
177 SocketAddr::V6(_) => TcpSocket::new_v6(),
178 }?;
179 tcp_socket.set_reuseport(true).ok();
180 tcp_socket.set_reuseaddr(true).ok();
181 tcp_socket.bind(tcp_addr)?;
182
183 tcp_socket.listen(50)
184}
185
186pub async fn periodically_show_peer_stats(blockchain: Arc<Blockchain>, peer_table: PeerTable) {
187 periodically_show_peer_stats_during_syncing(blockchain, &peer_table).await;
188 periodically_show_peer_stats_after_sync(&peer_table).await;
189}
190
191#[derive(Default, Clone, Copy)]
193struct PhaseCounters {
194 headers: u64,
195 accounts: u64,
196 accounts_inserted: u64,
197 storage: u64,
198 storage_inserted: u64,
199 healed_accounts: u64,
200 healed_storage: u64,
201 bytecodes: u64,
202}
203
204impl PhaseCounters {
205 fn capture_current() -> Self {
206 Self {
207 headers: METRICS.downloaded_headers.get(),
208 accounts: METRICS.downloaded_account_tries.load(Ordering::Relaxed),
209 accounts_inserted: METRICS.account_tries_inserted.load(Ordering::Relaxed),
210 storage: METRICS.storage_leaves_downloaded.get(),
211 storage_inserted: METRICS.storage_leaves_inserted.get(),
212 healed_accounts: METRICS
213 .global_state_trie_leafs_healed
214 .load(Ordering::Relaxed),
215 healed_storage: METRICS
216 .global_storage_tries_leafs_healed
217 .load(Ordering::Relaxed),
218 bytecodes: METRICS.downloaded_bytecodes.load(Ordering::Relaxed),
219 }
220 }
221}
222
223pub async fn periodically_show_peer_stats_during_syncing(
224 blockchain: Arc<Blockchain>,
225 peer_table: &PeerTable,
226) {
227 let start = std::time::Instant::now();
228 let mut previous_step = CurrentStepValue::None;
229 let mut phase_start_time = std::time::Instant::now();
230 let mut sync_started_logged = false;
231
232 let mut phase_start = PhaseCounters::default();
234 let mut prev_interval = PhaseCounters::default();
236
237 loop {
238 if blockchain.is_synced() {
239 if !sync_started_logged {
240 info!("Node already has state; following chain via full sync");
241 return;
242 }
243 let total_elapsed = format_duration(start.elapsed());
245 let headers_downloaded = METRICS.downloaded_headers.get();
246 let accounts_downloaded = METRICS.downloaded_account_tries.load(Ordering::Relaxed);
247 let storage_downloaded = METRICS.storage_leaves_downloaded.get();
248 let bytecodes_downloaded = METRICS.downloaded_bytecodes.load(Ordering::Relaxed);
249 let healed_accounts = METRICS
250 .global_state_trie_leafs_healed
251 .load(Ordering::Relaxed);
252 let healed_storage = METRICS
253 .global_storage_tries_leafs_healed
254 .load(Ordering::Relaxed);
255
256 info!("");
257 info!(
258 "╭──────────────────────────────────────────────────────────────────────────────╮"
259 );
260 info!(
261 "│ SNAP SYNC COMPLETE │"
262 );
263 info!(
264 "├──────────────────────────────────────────────────────────────────────────────┤"
265 );
266 info!("│ {:<76}│", format!("Total time: {}", total_elapsed));
267 info!(
268 "├──────────────────────────────────────────────────────────────────────────────┤"
269 );
270 info!(
271 "│ Data summary: │"
272 );
273 let headers_accounts = format!(
274 " Headers: {:<14} │ Accounts: {}",
275 format_thousands(headers_downloaded),
276 format_thousands(accounts_downloaded)
277 );
278 info!("│ {:<76}│", headers_accounts);
279 let storage_bytecodes = format!(
280 " Storage: {:<14} │ Bytecodes: {}",
281 format_thousands(storage_downloaded),
282 format_thousands(bytecodes_downloaded)
283 );
284 info!("│ {:<76}│", storage_bytecodes);
285 let healed = format!(
286 " Healed: {} state paths + {} storage accounts",
287 format_thousands(healed_accounts),
288 format_thousands(healed_storage)
289 );
290 info!("│ {:<76}│", healed);
291 info!(
292 "╰──────────────────────────────────────────────────────────────────────────────╯"
293 );
294 return;
295 }
296
297 let metrics_enabled = *METRICS.enabled.lock().await;
298 if !metrics_enabled {
299 tokio::time::sleep(Duration::from_secs(1)).await;
300 continue;
301 }
302
303 let current_step = METRICS.current_step.get();
304 let peer_number = peer_table.peer_count().await.unwrap_or(0);
305
306 if !sync_started_logged && current_step != CurrentStepValue::None {
308 let sync_head_block = METRICS.sync_head_block.load(Ordering::Relaxed);
309 let sync_head_hash = *METRICS.sync_head_hash.lock().await;
310
311 if sync_head_block > 0 && sync_head_hash != H256::zero() {
313 let head_short = format!("{:x}", sync_head_hash);
314 let head_short = &head_short[..8.min(head_short.len())];
315
316 info!("");
317 info!("╭─────────────────────────────────────────────────────────────╮");
318 info!("│ {:<59} │", "SNAP SYNC STARTED");
319 let target_content = format!(
320 "Target: {}... (block #{})",
321 head_short,
322 format_thousands(sync_head_block)
323 );
324 info!("│ {:<59} │", target_content);
325 info!("│ {:<59} │", format!("Peers: {}", peer_number));
326 info!("╰─────────────────────────────────────────────────────────────╯");
327 sync_started_logged = true;
328 }
329 }
330
331 if !sync_started_logged {
333 tokio::time::sleep(Duration::from_secs(1)).await;
334 continue;
335 }
336
337 if current_step != previous_step && current_step != CurrentStepValue::None {
339 if previous_step != CurrentStepValue::None {
341 let phase_elapsed = phase_start_time.elapsed();
343 let total_elapsed = format_duration(start.elapsed());
344 log_phase_progress(
345 previous_step,
346 phase_elapsed,
347 &total_elapsed,
348 peer_number,
349 &prev_interval,
350 )
351 .await;
352
353 let phase_elapsed_str = format_duration(phase_start_time.elapsed());
354 log_phase_completion(
355 previous_step,
356 phase_elapsed_str,
357 &phase_metrics(previous_step, &phase_start).await,
358 );
359
360 #[cfg(feature = "metrics")]
362 push_sync_prometheus_metrics(previous_step);
363 }
364
365 phase_start_time = std::time::Instant::now();
367
368 #[cfg(feature = "metrics")]
370 {
371 let (_, phase_name) = phase_info(current_step);
372 let now = std::time::SystemTime::now()
373 .duration_since(std::time::UNIX_EPOCH)
374 .unwrap_or_default()
375 .as_secs();
376 ethrex_metrics::sync::METRICS_SYNC
377 .phase_start_timestamp
378 .with_label_values(&[phase_name])
379 .set(now as i64);
380 }
381
382 phase_start = PhaseCounters::capture_current();
384 prev_interval = phase_start;
385
386 log_phase_separator(current_step);
387 previous_step = current_step;
388 }
389
390 let phase_elapsed = phase_start_time.elapsed();
392 let total_elapsed = format_duration(start.elapsed());
393
394 log_phase_progress(
395 current_step,
396 phase_elapsed,
397 &total_elapsed,
398 peer_number,
399 &prev_interval,
400 )
401 .await;
402
403 #[cfg(feature = "metrics")]
405 {
406 push_sync_prometheus_metrics(current_step);
407 let diag = peer_table.get_peer_diagnostics().await.unwrap_or_default();
408 let snap_peers = diag
409 .iter()
410 .filter(|p| p.capabilities.iter().any(|c| c.starts_with("snap/")))
411 .count();
412 let eligible = diag.iter().filter(|p| p.eligible).count();
413 let inflight: i64 = diag.iter().map(|p| p.inflight_requests).sum();
414 ethrex_metrics::sync::METRICS_SYNC.set_snap_peers(snap_peers as i64);
415 ethrex_metrics::sync::METRICS_SYNC.set_eligible_peers(eligible as i64);
416 ethrex_metrics::sync::METRICS_SYNC.set_inflight_requests(inflight);
417 }
418
419 prev_interval = PhaseCounters::capture_current();
421
422 tokio::time::sleep(Duration::from_secs(10)).await;
423 }
424}
425
426fn phase_info(step: CurrentStepValue) -> (u8, &'static str) {
428 match step {
429 CurrentStepValue::DownloadingHeaders => (1, "BLOCK HEADERS"),
430 CurrentStepValue::RequestingAccountRanges => (2, "ACCOUNT RANGES"),
431 CurrentStepValue::InsertingAccountRanges | CurrentStepValue::InsertingAccountRangesNoDb => {
432 (3, "ACCOUNT INSERTION")
433 }
434 CurrentStepValue::RequestingStorageRanges => (4, "STORAGE RANGES"),
435 CurrentStepValue::InsertingStorageRanges => (5, "STORAGE INSERTION"),
436 CurrentStepValue::HealingState => (6, "STATE HEALING"),
437 CurrentStepValue::HealingStorage => (7, "STORAGE HEALING"),
438 CurrentStepValue::RequestingBytecodes => (8, "BYTECODES"),
439 CurrentStepValue::None => (0, "UNKNOWN"),
440 }
441}
442
443fn log_phase_separator(step: CurrentStepValue) {
444 let (phase_num, phase_name) = phase_info(step);
445 let header = format!("── PHASE {}/8: {} ", phase_num, phase_name);
446 let header_width = header.chars().count();
447 let padding_width = 80usize.saturating_sub(header_width);
448 let padding = "─".repeat(padding_width);
449 info!("");
450 info!("{}{}", header, padding);
451}
452
453fn log_phase_completion(step: CurrentStepValue, elapsed: String, summary: &str) {
454 let (_, phase_name) = phase_info(step);
455 info!("✓ {} complete: {} in {}", phase_name, summary, elapsed);
456}
457
458async fn phase_metrics(step: CurrentStepValue, phase_start: &PhaseCounters) -> String {
459 match step {
460 CurrentStepValue::DownloadingHeaders => {
461 let downloaded = METRICS
462 .downloaded_headers
463 .get()
464 .saturating_sub(phase_start.headers);
465 format!("{} headers", format_thousands(downloaded))
466 }
467 CurrentStepValue::RequestingAccountRanges => {
468 let downloaded = METRICS
469 .downloaded_account_tries
470 .load(Ordering::Relaxed)
471 .saturating_sub(phase_start.accounts);
472 format!("{} accounts", format_thousands(downloaded))
473 }
474 CurrentStepValue::InsertingAccountRanges | CurrentStepValue::InsertingAccountRangesNoDb => {
475 let inserted = METRICS
476 .account_tries_inserted
477 .load(Ordering::Relaxed)
478 .saturating_sub(phase_start.accounts_inserted);
479 format!("{} accounts inserted", format_thousands(inserted))
480 }
481 CurrentStepValue::RequestingStorageRanges => {
482 let downloaded = METRICS
483 .storage_leaves_downloaded
484 .get()
485 .saturating_sub(phase_start.storage);
486 format!("{} storage slots", format_thousands(downloaded))
487 }
488 CurrentStepValue::InsertingStorageRanges => {
489 let inserted = METRICS
490 .storage_leaves_inserted
491 .get()
492 .saturating_sub(phase_start.storage_inserted);
493 format!("{} storage slots inserted", format_thousands(inserted))
494 }
495 CurrentStepValue::HealingState => {
496 let healed = METRICS
497 .global_state_trie_leafs_healed
498 .load(Ordering::Relaxed)
499 .saturating_sub(phase_start.healed_accounts);
500 format!("{} state paths healed", format_thousands(healed))
501 }
502 CurrentStepValue::HealingStorage => {
503 let healed = METRICS
504 .global_storage_tries_leafs_healed
505 .load(Ordering::Relaxed)
506 .saturating_sub(phase_start.healed_storage);
507 format!("{} storage accounts healed", format_thousands(healed))
508 }
509 CurrentStepValue::RequestingBytecodes => {
510 let downloaded = METRICS
511 .downloaded_bytecodes
512 .load(Ordering::Relaxed)
513 .saturating_sub(phase_start.bytecodes);
514 format!("{} bytecodes", format_thousands(downloaded))
515 }
516 CurrentStepValue::None => String::new(),
517 }
518}
519
520const PROGRESS_INTERVAL_SECS: u64 = 30;
522
523async fn log_phase_progress(
524 step: CurrentStepValue,
525 phase_elapsed: Duration,
526 total_elapsed: &str,
527 peer_count: usize,
528 prev_interval: &PhaseCounters,
529) {
530 let phase_elapsed_str = format_duration(phase_elapsed);
531
532 let col1_width = 40;
534
535 match step {
536 CurrentStepValue::DownloadingHeaders => {
537 let headers_to_download = METRICS.sync_head_block.load(Ordering::Relaxed);
538 let headers_downloaded =
539 u64::min(METRICS.downloaded_headers.get(), headers_to_download);
540 let interval_downloaded = headers_downloaded.saturating_sub(prev_interval.headers);
541 let percentage = if headers_to_download == 0 {
542 0.0
543 } else {
544 (headers_downloaded as f64 / headers_to_download as f64) * 100.0
545 };
546 let rate = interval_downloaded / PROGRESS_INTERVAL_SECS;
547
548 let progress = progress_bar(percentage, 40);
549 info!(" {} {:>5.1}%", progress, percentage);
550 info!("");
551 let col1 = format!(
552 "Headers: {} / {}",
553 format_thousands(headers_downloaded),
554 format_thousands(headers_to_download)
555 );
556 info!(" {:<col1_width$} │ Elapsed: {}", col1, phase_elapsed_str);
557 let col1 = format!("Rate: {} headers/s", format_thousands(rate));
558 info!(" {:<col1_width$} │ Peers: {}", col1, peer_count);
559 info!(" Total time: {}", total_elapsed);
560 }
561 CurrentStepValue::RequestingAccountRanges => {
562 let accounts_downloaded = METRICS.downloaded_account_tries.load(Ordering::Relaxed);
563 let interval_downloaded = accounts_downloaded.saturating_sub(prev_interval.accounts);
564 let rate = interval_downloaded / PROGRESS_INTERVAL_SECS;
565
566 info!("");
567 let col1 = format!(
568 "Accounts fetched: {}",
569 format_thousands(accounts_downloaded)
570 );
571 info!(" {:<col1_width$} │ Elapsed: {}", col1, phase_elapsed_str);
572 let col1 = format!("Rate: {} accounts/s", format_thousands(rate));
573 info!(" {:<col1_width$} │ Peers: {}", col1, peer_count);
574 info!(" Total time: {}", total_elapsed);
575 }
576 CurrentStepValue::InsertingAccountRanges | CurrentStepValue::InsertingAccountRangesNoDb => {
577 let accounts_to_insert = METRICS.downloaded_account_tries.load(Ordering::Relaxed);
578 let accounts_inserted = METRICS.account_tries_inserted.load(Ordering::Relaxed);
579 let interval_inserted =
580 accounts_inserted.saturating_sub(prev_interval.accounts_inserted);
581 let percentage = if accounts_to_insert == 0 {
582 0.0
583 } else {
584 (accounts_inserted as f64 / accounts_to_insert as f64) * 100.0
585 };
586 let rate = interval_inserted / PROGRESS_INTERVAL_SECS;
587
588 let progress = progress_bar(percentage, 40);
589 info!(" {} {:>5.1}%", progress, percentage);
590 info!("");
591 let col1 = format!(
592 "Accounts: {} / {}",
593 format_thousands(accounts_inserted),
594 format_thousands(accounts_to_insert)
595 );
596 info!(" {:<col1_width$} │ Elapsed: {}", col1, phase_elapsed_str);
597 let col1 = format!("Rate: {} accounts/s", format_thousands(rate));
598 info!(" {:<col1_width$} │ Peers: {}", col1, peer_count);
599 info!(" Total time: {}", total_elapsed);
600 }
601 CurrentStepValue::RequestingStorageRanges => {
602 let storage_downloaded = METRICS.storage_leaves_downloaded.get();
603 let interval_downloaded = storage_downloaded.saturating_sub(prev_interval.storage);
604 let rate = interval_downloaded / PROGRESS_INTERVAL_SECS;
605
606 info!("");
607 let col1 = format!(
608 "Storage slots fetched: {}",
609 format_thousands(storage_downloaded)
610 );
611 info!(" {:<col1_width$} │ Elapsed: {}", col1, phase_elapsed_str);
612 let col1 = format!("Rate: {} slots/s", format_thousands(rate));
613 info!(" {:<col1_width$} │ Peers: {}", col1, peer_count);
614 info!(" Total time: {}", total_elapsed);
615 }
616 CurrentStepValue::InsertingStorageRanges => {
617 let storage_inserted = METRICS.storage_leaves_inserted.get();
618 let interval_inserted = storage_inserted.saturating_sub(prev_interval.storage_inserted);
619 let rate = interval_inserted / PROGRESS_INTERVAL_SECS;
620
621 info!("");
622 let col1 = format!(
623 "Storage slots inserted: {}",
624 format_thousands(storage_inserted)
625 );
626 info!(" {:<col1_width$} │ Elapsed: {}", col1, phase_elapsed_str);
627 let col1 = format!("Rate: {} slots/s", format_thousands(rate));
628 info!(" {:<col1_width$} │ Peers: {}", col1, peer_count);
629 info!(" Total time: {}", total_elapsed);
630 }
631 CurrentStepValue::HealingState => {
632 let healed = METRICS
633 .global_state_trie_leafs_healed
634 .load(Ordering::Relaxed);
635 let interval_healed = healed.saturating_sub(prev_interval.healed_accounts);
636 let rate = interval_healed / PROGRESS_INTERVAL_SECS;
637
638 info!("");
639 let col1 = format!("State paths healed: {}", format_thousands(healed));
640 info!(" {:<col1_width$} │ Elapsed: {}", col1, phase_elapsed_str);
641 let col1 = format!("Rate: {} paths/s", format_thousands(rate));
642 info!(" {:<col1_width$} │ Peers: {}", col1, peer_count);
643 info!(" Total time: {}", total_elapsed);
644 }
645 CurrentStepValue::HealingStorage => {
646 let healed = METRICS
647 .global_storage_tries_leafs_healed
648 .load(Ordering::Relaxed);
649 let interval_healed = healed.saturating_sub(prev_interval.healed_storage);
650 let rate = interval_healed / PROGRESS_INTERVAL_SECS;
651
652 info!("");
653 let col1 = format!("Storage accounts healed: {}", format_thousands(healed));
654 info!(" {:<col1_width$} │ Elapsed: {}", col1, phase_elapsed_str);
655 let col1 = format!("Rate: {} accounts/s", format_thousands(rate));
656 info!(" {:<col1_width$} │ Peers: {}", col1, peer_count);
657 info!(" Total time: {}", total_elapsed);
658 }
659 CurrentStepValue::RequestingBytecodes => {
660 let bytecodes_to_download = METRICS.bytecodes_to_download.load(Ordering::Relaxed);
661 let bytecodes_downloaded = METRICS.downloaded_bytecodes.load(Ordering::Relaxed);
662 let interval_downloaded = bytecodes_downloaded.saturating_sub(prev_interval.bytecodes);
663 let percentage = if bytecodes_to_download == 0 {
664 0.0
665 } else {
666 (bytecodes_downloaded as f64 / bytecodes_to_download as f64) * 100.0
667 };
668 let rate = interval_downloaded / PROGRESS_INTERVAL_SECS;
669
670 let progress = progress_bar(percentage, 40);
671 info!(" {} {:>5.1}%", progress, percentage);
672 info!("");
673 let col1 = format!(
674 "Bytecodes: {} / {}",
675 format_thousands(bytecodes_downloaded),
676 format_thousands(bytecodes_to_download)
677 );
678 info!(" {:<col1_width$} │ Elapsed: {}", col1, phase_elapsed_str);
679 let col1 = format!("Rate: {} codes/s", format_thousands(rate));
680 info!(" {:<col1_width$} │ Peers: {}", col1, peer_count);
681 info!(" Total time: {}", total_elapsed);
682 }
683 CurrentStepValue::None => {}
684 }
685}
686
687#[cfg(feature = "metrics")]
690fn push_sync_prometheus_metrics(step: CurrentStepValue) {
691 use ethrex_metrics::sync::METRICS_SYNC;
692 use std::sync::atomic::Ordering::Relaxed;
693
694 let (phase_num, _) = phase_info(step);
695 METRICS_SYNC.stage.set(phase_num as i64);
696 METRICS_SYNC
697 .pivot_block
698 .set(METRICS.sync_head_block.load(Relaxed) as i64);
699
700 let pivot_ts = METRICS.pivot_timestamp.load(Relaxed);
702 if pivot_ts > 0 {
703 METRICS_SYNC.pivot_timestamp.set(pivot_ts as i64);
704 }
705 if pivot_ts > 0 {
707 let now = std::time::SystemTime::now()
708 .duration_since(std::time::UNIX_EPOCH)
709 .unwrap_or_default()
710 .as_secs();
711 METRICS_SYNC
712 .pivot_age_seconds
713 .set(now.saturating_sub(pivot_ts) as i64);
714 }
715
716 match step {
717 CurrentStepValue::DownloadingHeaders => {
718 let total = METRICS.sync_head_block.load(Relaxed);
719 let downloaded = u64::min(METRICS.downloaded_headers.get(), total);
720 METRICS_SYNC.headers_downloaded.set(downloaded as i64);
721 METRICS_SYNC.headers_total.set(total as i64);
722 }
723 CurrentStepValue::RequestingAccountRanges => {
724 let downloaded = METRICS.downloaded_account_tries.load(Relaxed);
725 METRICS_SYNC.accounts_downloaded.set(downloaded as i64);
726 }
727 CurrentStepValue::InsertingAccountRanges | CurrentStepValue::InsertingAccountRangesNoDb => {
728 let total = METRICS.downloaded_account_tries.load(Relaxed);
729 let inserted = METRICS.account_tries_inserted.load(Relaxed);
730 METRICS_SYNC.accounts_downloaded.set(total as i64);
731 METRICS_SYNC.accounts_inserted.set(inserted as i64);
732 }
733 CurrentStepValue::RequestingStorageRanges => {
734 let downloaded = METRICS.storage_leaves_downloaded.get();
735 METRICS_SYNC.storage_downloaded.set(downloaded as i64);
736 }
737 CurrentStepValue::InsertingStorageRanges => {
738 let inserted = METRICS.storage_leaves_inserted.get();
739 METRICS_SYNC.storage_inserted.set(inserted as i64);
740 }
741 CurrentStepValue::HealingState => {
742 let healed = METRICS.global_state_trie_leafs_healed.load(Relaxed);
743 METRICS_SYNC.state_leaves_healed.set(healed as i64);
744 }
745 CurrentStepValue::HealingStorage => {
746 let healed = METRICS.global_storage_tries_leafs_healed.load(Relaxed);
747 METRICS_SYNC.storage_leaves_healed.set(healed as i64);
748 }
749 CurrentStepValue::RequestingBytecodes => {
750 let total = METRICS.bytecodes_to_download.load(Relaxed);
751 let downloaded = METRICS.downloaded_bytecodes.load(Relaxed);
752 METRICS_SYNC.bytecodes_downloaded.set(downloaded as i64);
753 METRICS_SYNC.bytecodes_total.set(total as i64);
754 }
755 CurrentStepValue::None => {}
756 }
757}
758
759fn progress_bar(percentage: f64, width: usize) -> String {
760 let clamped_percentage = percentage.clamp(0.0, 100.0);
761 let filled = ((clamped_percentage / 100.0) * width as f64) as usize;
762 let filled = filled.min(width);
763 let empty = width.saturating_sub(filled);
764 format!("{}{}", "▓".repeat(filled), "░".repeat(empty))
765}
766
767fn format_thousands(n: u64) -> String {
768 let s = n.to_string();
769 let mut result = String::new();
770 for (i, c) in s.chars().rev().enumerate() {
771 if i > 0 && i % 3 == 0 {
772 result.push(',');
773 }
774 result.push(c);
775 }
776 result.chars().rev().collect()
777}
778
779pub async fn periodically_show_peer_stats_after_sync(peer_table: &PeerTable) {
781 const INTERVAL_DURATION: tokio::time::Duration = tokio::time::Duration::from_secs(60);
782 let mut interval = tokio::time::interval(INTERVAL_DURATION);
783 loop {
784 let peers: Vec<PeerData> = peer_table.get_peers_data().await.unwrap_or(Vec::new());
786 let active_peers = peers
787 .iter()
788 .filter(|peer| -> bool { peer.connection.as_ref().is_some() })
789 .count();
790 let snap_active_peers = peers
791 .iter()
792 .filter(|peer| -> bool {
793 peer.connection.as_ref().is_some()
794 && SUPPORTED_SNAP_CAPABILITIES
795 .iter()
796 .any(|cap| peer.supported_capabilities.contains(cap))
797 })
798 .count();
799 info!("Peers: {active_peers} (snap-capable: {snap_active_peers})");
800 interval.tick().await;
801 }
802}
803
804fn format_duration(duration: Duration) -> String {
805 let total_seconds = duration.as_secs();
806 let hours = total_seconds / 3600;
807 let minutes = (total_seconds % 3600) / 60;
808 let seconds = total_seconds % 60;
809 format!("{hours:02}:{minutes:02}:{seconds:02}")
810}