Skip to main content

ethrex_p2p/
peer_handler.rs

1use crate::rlpx::initiator::RLPxInitiator;
2use crate::{
3    metrics::{CurrentStepValue, METRICS},
4    peer_table::{
5        PeerData, PeerDiagnostics, PeerTable, PeerTableServerProtocol as _, RequestPermit,
6    },
7    rlpx::{
8        connection::server::PeerConnection,
9        error::PeerConnectionError,
10        eth::{
11            block_access_lists::{BlockAccessLists, GetBlockAccessLists},
12            blocks::{
13                BLOCK_HEADER_LIMIT, BlockBodies, BlockHeaders, GetBlockBodies, GetBlockHeaders,
14                HashOrNumber,
15            },
16        },
17        message::Message as RLPxMessage,
18        p2p::{Capability, SUPPORTED_ETH_CAPABILITIES},
19    },
20};
21use ethrex_common::{
22    H256,
23    types::{BlockBody, BlockHeader, block_access_list::BlockAccessList, validate_block_body},
24};
25use ethrex_crypto::NativeCrypto;
26use spawned_concurrency::{error::ActorError, tasks::ActorRef};
27use std::{
28    collections::{HashSet, VecDeque},
29    sync::atomic::Ordering,
30    time::{Duration, SystemTime},
31};
32use tracing::{debug, error, trace, warn};
33
34// Re-export constants from snap::constants for backward compatibility
35pub use crate::snap::constants::{
36    HASH_MAX, MAX_BLOCK_BODIES_TO_REQUEST, MAX_HEADER_CHUNK, MAX_RESPONSE_BYTES,
37    PEER_REPLY_TIMEOUT, PEER_SELECT_RETRY_ATTEMPTS, RANGE_FILE_CHUNK_SIZE, REQUEST_RETRY_ATTEMPTS,
38    SNAP_LIMIT,
39};
40
41// Re-export snap client types for backward compatibility
42pub use crate::snap::{DumpError, RequestMetadata, RequestStorageTrieNodesError, SnapError};
43
44/// An abstraction over the [Kademlia] containing logic to make requests to peers
45#[derive(Debug, Clone)]
46pub struct PeerHandler {
47    pub peer_table: PeerTable,
48    pub initiator: ActorRef<RLPxInitiator>,
49}
50
51pub enum BlockRequestOrder {
52    OldToNew,
53    NewToOld,
54}
55
56/// Result of a block-header request, distinguishing why no headers came back so sync
57/// diagnostics can tell a connectivity problem from peers withholding data.
58#[derive(Debug)]
59pub enum HeaderFetchOutcome {
60    /// Headers were obtained from a peer.
61    Headers(Vec<BlockHeader>),
62    /// No suitable peer was available to send the request to (e.g. no eth-capable peer
63    /// connected, or all are busy / penalized).
64    NoPeerAvailable,
65    /// A peer was queried but returned no usable response (timeout, empty, or unchained).
66    PeerFailed,
67}
68
69impl HeaderFetchOutcome {
70    /// A short, log-friendly reason for a non-`Headers` outcome.
71    pub fn failure_reason(&self) -> &'static str {
72        match self {
73            HeaderFetchOutcome::Headers(_) => "headers received",
74            HeaderFetchOutcome::NoPeerAvailable => {
75                "no eth-capable peer with a live connection to query (peers may be connecting or recently dropped)"
76            }
77            HeaderFetchOutcome::PeerFailed => "peer(s) queried but did not serve headers",
78        }
79    }
80}
81
82/// Asks a single already-selected peer for the block number at `sync_head`.
83/// Consumes a `RequestPermit`; the permit drops on return, releasing the slot.
84async fn ask_peer_head_number(
85    peer_id: H256,
86    connection: &mut PeerConnection,
87    _permit: RequestPermit,
88    sync_head: H256,
89    retries: i32,
90) -> Result<u64, PeerHandlerError> {
91    // TODO: Better error handling
92    trace!("Sync Log 11: Requesting sync head block number from peer {peer_id}");
93    let request_id = rand::random();
94    let request = RLPxMessage::GetBlockHeaders(GetBlockHeaders {
95        id: request_id,
96        startblock: HashOrNumber::Hash(sync_head),
97        limit: 1,
98        skip: 0,
99        reverse: false,
100    });
101
102    debug!("(Retry {retries}) Requesting sync head {sync_head:?} to peer {peer_id}");
103
104    match connection
105        .outgoing_request(request, PEER_REPLY_TIMEOUT)
106        .await
107    {
108        Ok(RLPxMessage::BlockHeaders(BlockHeaders {
109            id: _,
110            block_headers,
111        })) => {
112            if !block_headers.is_empty() {
113                let sync_head_number = block_headers
114                    .last()
115                    .ok_or(PeerHandlerError::BlockHeaders)?
116                    .number;
117                trace!(
118                    "Sync Log 12: Received sync head block headers from peer {peer_id}, sync head number {sync_head_number}"
119                );
120                Ok(sync_head_number)
121            } else {
122                Err(PeerHandlerError::EmptyResponseFromPeer(peer_id))
123            }
124        }
125        Ok(_other_msgs) => Err(PeerHandlerError::UnexpectedResponseFromPeer(peer_id)),
126        Err(PeerConnectionError::Timeout) => {
127            Err(PeerHandlerError::ReceiveMessageFromPeerTimeout(peer_id))
128        }
129        Err(_other_err) => Err(PeerHandlerError::ReceiveMessageFromPeer(peer_id)),
130    }
131}
132
133impl PeerHandler {
134    pub fn new(peer_table: PeerTable, initiator: ActorRef<RLPxInitiator>) -> PeerHandler {
135        Self {
136            peer_table,
137            initiator,
138        }
139    }
140
141    /// Returns a random node id and the channel ends to an active peer connection that supports the given capability
142    /// It doesn't guarantee that the selected peer is not currently busy
143    async fn get_random_peer(
144        &mut self,
145        capabilities: &[Capability],
146    ) -> Result<Option<(H256, PeerConnection, RequestPermit)>, PeerHandlerError> {
147        Ok(self
148            .peer_table
149            .get_random_peer(capabilities.to_vec())
150            .await?)
151    }
152
153    /// Number of peers known to the table that advertise the eth capabilities used for sync.
154    /// NOTE: this counts eth-capable peers regardless of whether they currently have a live
155    /// connection, so it can be greater than the number actually queryable via
156    /// `get_random_peer` (which requires a live connection). Used only for diagnostics; logged
157    /// as `eth_capable_peers`. Returns 0 on any peer-table error.
158    pub async fn eth_capable_peer_count(&self) -> usize {
159        self.peer_table
160            .peer_count_by_capabilities(SUPPORTED_ETH_CAPABILITIES.to_vec())
161            .await
162            .unwrap_or(0)
163    }
164
165    /// Requests block headers from any suitable peer, starting from the `start` block hash towards either older or newer blocks depending on the order
166    /// Returns the block headers or None if:
167    /// - There are no available peers (the node just started up or was rejected by all other nodes)
168    /// - No peer returned a valid response in the given time and retry limits
169    pub async fn request_block_headers(
170        &mut self,
171        start: u64,
172        sync_head: H256,
173    ) -> Result<Option<Vec<BlockHeader>>, PeerHandlerError> {
174        let start_time = SystemTime::now();
175        METRICS
176            .current_step
177            .set(CurrentStepValue::DownloadingHeaders);
178
179        let mut ret = Vec::<BlockHeader>::new();
180
181        let mut sync_head_number = 0_u64;
182
183        let sync_head_number_retrieval_start = SystemTime::now();
184
185        debug!("Retrieving sync head block number from peers");
186
187        let mut retries = 1;
188
189        // Ask up to MAX_PEERS_TO_ASK peers per retry (no point asking 40+
190        // peers sequentially with a 15s timeout each).
191        const MAX_PEERS_TO_ASK: usize = 5;
192        const MAX_RETRIES: i32 = 3;
193
194        while sync_head_number == 0 {
195            if retries > MAX_RETRIES {
196                // sync_head is unknown to our peers
197                return Ok(None);
198            }
199            let peers = self
200                .peer_table
201                .get_best_n_peers(SUPPORTED_ETH_CAPABILITIES.to_vec(), MAX_PEERS_TO_ASK)
202                .await?;
203
204            let selected_peers: Vec<_> = peers.iter().map(|(id, _, _)| *id).collect();
205            debug!(
206                retry = retries,
207                peers_selected = ?selected_peers,
208                "request_block_headers: resolving sync head with peers"
209            );
210            for (peer_id, mut connection, permit) in peers {
211                match ask_peer_head_number(peer_id, &mut connection, permit, sync_head, retries)
212                    .await
213                {
214                    Ok(number) => {
215                        sync_head_number = number;
216                        if number != 0 {
217                            #[cfg(feature = "metrics")]
218                            ethrex_metrics::sync::METRICS_SYNC.inc_header_resolution("found");
219                            break;
220                        }
221                        #[cfg(feature = "metrics")]
222                        ethrex_metrics::sync::METRICS_SYNC.inc_header_resolution("unknown");
223                    }
224                    Err(err) => {
225                        #[cfg(feature = "metrics")]
226                        ethrex_metrics::sync::METRICS_SYNC.inc_header_resolution("timeout");
227                        debug!(
228                            "Sync Log 13: Failed to retrieve sync head block number from peer {peer_id}: {err}"
229                        );
230                    }
231                }
232            }
233
234            retries += 1;
235        }
236        METRICS
237            .sync_head_block
238            .store(sync_head_number, Ordering::Relaxed);
239        sync_head_number = sync_head_number.min(start + MAX_HEADER_CHUNK);
240
241        let sync_head_number_retrieval_elapsed = sync_head_number_retrieval_start
242            .elapsed()
243            .unwrap_or_default();
244
245        debug!("Sync head block number retrieved");
246
247        *METRICS.time_to_retrieve_sync_head_block.lock().await =
248            Some(sync_head_number_retrieval_elapsed);
249        *METRICS.sync_head_hash.lock().await = sync_head;
250
251        let block_count = sync_head_number + 1 - start;
252        let chunk_count = if block_count < 800_u64 { 1 } else { 800_u64 };
253
254        // 2) partition the amount of headers in `K` tasks
255        let chunk_limit = block_count / chunk_count;
256
257        // list of tasks to be executed
258        let mut tasks_queue_not_started = VecDeque::<(u64, u64)>::new();
259
260        for i in 0..chunk_count {
261            tasks_queue_not_started.push_back((i * chunk_limit + start, chunk_limit));
262        }
263
264        // Push the reminder
265        if !block_count.is_multiple_of(chunk_count) {
266            tasks_queue_not_started
267                .push_back((chunk_count * chunk_limit + start, block_count % chunk_count));
268        }
269
270        let mut downloaded_count = 0_u64;
271
272        // channel to send the tasks to the peers
273        let (task_sender, mut task_receiver) =
274            tokio::sync::mpsc::channel::<(Vec<BlockHeader>, H256, PeerConnection, u64, u64)>(1000);
275
276        let mut current_show = 0;
277
278        // 3) create tasks that will request a chunk of headers from a peer
279
280        debug!("Starting to download block headers from peers");
281
282        *METRICS.headers_download_start_time.lock().await = Some(SystemTime::now());
283
284        let mut logged_no_free_peers_count = 0;
285
286        loop {
287            if let Ok((headers, peer_id, _connection, startblock, previous_chunk_limit)) =
288                task_receiver.try_recv()
289            {
290                trace!("We received a download chunk from peer");
291                if headers.is_empty() {
292                    self.peer_table.record_failure(peer_id)?;
293
294                    debug!("Failed to download chunk from peer. Downloader {peer_id} freed");
295
296                    // reinsert the task to the queue
297                    tasks_queue_not_started.push_back((startblock, previous_chunk_limit));
298
299                    continue; // Retry with the next peer
300                }
301
302                downloaded_count += headers.len() as u64;
303
304                METRICS.downloaded_headers.inc_by(headers.len() as u64);
305
306                let batch_show = downloaded_count / 10_000;
307
308                if current_show < batch_show {
309                    debug!(
310                        "Downloaded {} headers from peer {} (current count: {downloaded_count})",
311                        headers.len(),
312                        peer_id
313                    );
314                    current_show += 1;
315                }
316                // store headers!!!!
317                ret.extend_from_slice(&headers);
318
319                let downloaded_headers = headers.len() as u64;
320
321                // reinsert the task to the queue if it was not completed
322                if downloaded_headers < previous_chunk_limit {
323                    let new_start = startblock + headers.len() as u64;
324
325                    let new_chunk_limit = previous_chunk_limit - headers.len() as u64;
326
327                    debug!(
328                        "Task for ({startblock}, {new_chunk_limit}) was not completed, re-adding to the queue, {new_chunk_limit} remaining headers"
329                    );
330
331                    tasks_queue_not_started.push_back((new_start, new_chunk_limit));
332                }
333
334                self.peer_table.record_success(peer_id)?;
335                debug!("Downloader {peer_id} freed");
336            }
337            let Some((peer_id, mut connection, permit)) = self
338                .peer_table
339                .get_best_peer(SUPPORTED_ETH_CAPABILITIES.to_vec())
340                .await?
341            else {
342                // Log ~ once every 10 seconds
343                if logged_no_free_peers_count == 0 {
344                    trace!("We are missing peers in request_block_headers");
345                    logged_no_free_peers_count = 1000;
346                }
347                logged_no_free_peers_count -= 1;
348                // Sleep a bit to avoid busy polling
349                tokio::time::sleep(Duration::from_millis(10)).await;
350                continue;
351            };
352
353            let Some((startblock, chunk_limit)) = tasks_queue_not_started.pop_front() else {
354                if downloaded_count >= block_count {
355                    debug!("All headers downloaded successfully");
356                    break;
357                }
358
359                let batch_show = downloaded_count / 10_000;
360
361                if current_show < batch_show {
362                    current_show += 1;
363                }
364
365                // Queue drained but in-flight tasks haven't returned yet.
366                // Drop the permit we just acquired (end of scope) and yield
367                // so the result receive path gets a chance to run.
368                tokio::task::yield_now().await;
369                continue;
370            };
371            let tx = task_sender.clone();
372            debug!("Downloader {peer_id} is now busy");
373
374            tokio::spawn(async move {
375                trace!(
376                    "Sync Log 5: Requesting block headers from peer {peer_id}, chunk_limit: {chunk_limit}"
377                );
378                let headers = Self::download_chunk_from_peer(
379                    peer_id,
380                    &mut connection,
381                    permit,
382                    startblock,
383                    chunk_limit,
384                )
385                .await
386                .inspect_err(|err| trace!("Sync Log 6: {peer_id} failed to download chunk: {err}"))
387                .unwrap_or_default();
388
389                tx.send((headers, peer_id, connection, startblock, chunk_limit))
390                    .await
391                    .inspect_err(|err| {
392                        error!("Failed to send headers result through channel. Error: {err}")
393                    })
394            });
395        }
396
397        let elapsed = start_time.elapsed().unwrap_or_default();
398
399        debug!(
400            "Downloaded all headers ({}) in {} seconds",
401            ret.len(),
402            format_duration(elapsed)
403        );
404
405        {
406            let downloaded_headers = ret.len();
407            let unique_headers = ret.iter().map(|h| h.hash()).collect::<HashSet<_>>();
408
409            debug!(
410                "Downloaded {} headers, unique: {}, duplicates: {}",
411                downloaded_headers,
412                unique_headers.len(),
413                downloaded_headers - unique_headers.len()
414            );
415
416            match downloaded_headers.cmp(&unique_headers.len()) {
417                std::cmp::Ordering::Equal => {
418                    debug!("All downloaded headers are unique");
419                }
420                std::cmp::Ordering::Greater => {
421                    debug!(
422                        "Downloaded headers contain duplicates, {} duplicates found",
423                        downloaded_headers - unique_headers.len()
424                    );
425                }
426                std::cmp::Ordering::Less => {
427                    debug!(
428                        "Downloaded headers are less than unique headers, this should not happen"
429                    );
430                }
431            }
432        }
433
434        ret.sort_by(|x, y| x.number.cmp(&y.number));
435        Ok(Some(ret))
436    }
437
438    /// Requests block headers from any suitable peer, starting from the `start` block hash towards either older or newer blocks depending on the order
439    /// - No peer returned a valid response in the given time and retry limits
440    ///   Since request_block_headers brought problems in cases of reorg seen in this pr https://github.com/lambdaclass/ethrex/pull/4028, we have this other function to request block headers only for full sync.
441    pub async fn request_block_headers_from_hash(
442        &mut self,
443        start: H256,
444        order: BlockRequestOrder,
445    ) -> Result<HeaderFetchOutcome, PeerHandlerError> {
446        let request_id = rand::random();
447        let request = RLPxMessage::GetBlockHeaders(GetBlockHeaders {
448            id: request_id,
449            startblock: start.into(),
450            limit: BLOCK_HEADER_LIMIT,
451            skip: 0,
452            reverse: matches!(order, BlockRequestOrder::NewToOld),
453        });
454        match self.get_random_peer(&SUPPORTED_ETH_CAPABILITIES).await? {
455            None => Ok(HeaderFetchOutcome::NoPeerAvailable),
456            Some((peer_id, mut connection, permit)) => {
457                let response = connection
458                    .outgoing_request(request, PEER_REPLY_TIMEOUT)
459                    .await;
460                drop(permit);
461                if let Ok(RLPxMessage::BlockHeaders(BlockHeaders {
462                    id: _,
463                    block_headers,
464                })) = response
465                {
466                    if block_headers.is_empty() {
467                        // Empty response is valid per eth spec (peer may not have these blocks),
468                        // so apply only a soft score penalty (`record_failure`) rather than
469                        // ejecting the peer (`set_disposable`): a spec-conformant peer that simply
470                        // lacks a fork's blocks shouldn't be permanently dropped from rotation.
471                        // Genuine misbehavior below (unchained / wrong-chain-start) uses the same
472                        // soft tier, so the distinction stays consistent.
473                        debug!(
474                            "[SYNCING] Received empty headers from peer {peer_id}, trying another"
475                        );
476                        self.peer_table.record_failure(peer_id)?;
477                        return Ok(HeaderFetchOutcome::PeerFailed);
478                    }
479                    if are_block_headers_chained(&block_headers, &order) {
480                        // Pin the response to the requested `start` hash. `are_block_headers_chained`
481                        // only verifies internal parent-hash linkage, not that the sequence actually
482                        // begins at `start`. A peer on a fork/minority chain can return an internally
483                        // consistent run of headers from its own chain that does NOT start at `start`;
484                        // accepting it derails the sync walk onto the wrong chain — it never reconciles
485                        // to our canonical head and walks all the way to genesis. Reject the mismatch and
486                        // penalize the peer so the caller re-rolls `get_random_peer` and keeps trying until
487                        // it lands a peer actually serving `start`'s chain (whose ancestry is hash-linked
488                        // and therefore bridges down to our canonical head). General hardening, not
489                        // devnet-specific.
490                        if block_headers[0].hash() != start {
491                            warn!(
492                                "[SYNCING] Peer {peer_id} returned headers not starting at the requested hash {start:#x}, penalizing peer"
493                            );
494                            self.peer_table.record_failure(peer_id)?;
495                            return Ok(HeaderFetchOutcome::PeerFailed);
496                        }
497                        self.peer_table.record_success(peer_id)?;
498                        return Ok(HeaderFetchOutcome::Headers(block_headers));
499                    }
500                    // Non-empty but unchained headers is a protocol violation
501                    debug!(
502                        "Received invalid (unchained) headers from peer, penalizing peer {peer_id}"
503                    );
504                    self.peer_table.record_failure(peer_id)?;
505                    return Ok(HeaderFetchOutcome::PeerFailed);
506                }
507                // Timeout or invalid response - mark peer as disposable
508                debug!("Didn't receive block headers from peer, penalizing peer {peer_id}");
509                self.peer_table.record_failure(peer_id)?;
510                Ok(HeaderFetchOutcome::PeerFailed)
511            }
512        }
513    }
514
515    /// Given a peer id, a chunk start and a chunk limit, requests the block headers from the peer.
516    /// Releases the peer slot as soon as the wire response is in; validation
517    /// below is pure computation.
518    async fn download_chunk_from_peer(
519        peer_id: H256,
520        connection: &mut PeerConnection,
521        permit: RequestPermit,
522        startblock: u64,
523        chunk_limit: u64,
524    ) -> Result<Vec<BlockHeader>, PeerHandlerError> {
525        debug!("Requesting block headers from peer {peer_id}");
526        let request_id = rand::random();
527        let request = RLPxMessage::GetBlockHeaders(GetBlockHeaders {
528            id: request_id,
529            startblock: HashOrNumber::Number(startblock),
530            limit: chunk_limit,
531            skip: 0,
532            reverse: false,
533        });
534        let response = connection
535            .outgoing_request(request, PEER_REPLY_TIMEOUT)
536            .await;
537        drop(permit);
538        if let Ok(RLPxMessage::BlockHeaders(BlockHeaders {
539            id: _,
540            block_headers,
541        })) = response
542        {
543            if are_block_headers_chained(&block_headers, &BlockRequestOrder::OldToNew) {
544                Ok(block_headers)
545            } else {
546                debug!("Received invalid headers from peer: {peer_id}");
547                Err(PeerHandlerError::InvalidHeaders)
548            }
549        } else {
550            Err(PeerHandlerError::BlockHeaders)
551        }
552    }
553
554    /// Internal method to request block bodies from any suitable peer given their block hashes
555    /// Returns the block bodies or None if:
556    /// - There are no available peers (the node just started up or was rejected by all other nodes)
557    /// - The requested peer did not return a valid response in the given time limit
558    async fn request_block_bodies_inner(
559        &mut self,
560        block_hashes: &[H256],
561    ) -> Result<Option<(Vec<BlockBody>, H256)>, PeerHandlerError> {
562        let block_hashes_len = block_hashes.len();
563        let request_id = rand::random();
564        let request = RLPxMessage::GetBlockBodies(GetBlockBodies {
565            id: request_id,
566            block_hashes: block_hashes.to_vec(),
567        });
568        match self.get_random_peer(&SUPPORTED_ETH_CAPABILITIES).await? {
569            None => Ok(None),
570            Some((peer_id, mut connection, permit)) => {
571                let response = connection
572                    .outgoing_request(request, PEER_REPLY_TIMEOUT)
573                    .await;
574                drop(permit);
575                if let Ok(RLPxMessage::BlockBodies(BlockBodies {
576                    id: _,
577                    block_bodies,
578                })) = response
579                {
580                    // Check that the response is not empty and does not contain more bodies than the ones requested
581                    if !block_bodies.is_empty() && block_bodies.len() <= block_hashes_len {
582                        self.peer_table.record_success(peer_id)?;
583                        return Ok(Some((block_bodies, peer_id)));
584                    }
585                }
586                debug!("Didn't receive block bodies from peer, penalizing peer {peer_id}");
587                self.peer_table.record_failure(peer_id)?;
588                let _ = self.peer_table.set_disposable(peer_id);
589                Ok(None)
590            }
591        }
592    }
593
594    /// Requests block bodies from any suitable peer given their block headers and validates them
595    /// Returns the requested block bodies or None if:
596    /// - There are no available peers (the node just started up or was rejected by all other nodes)
597    /// - No peer returned a valid response in the given time and retry limits
598    /// - The block bodies are invalid given the block headers
599    pub async fn request_block_bodies(
600        &mut self,
601        block_headers: &[BlockHeader],
602    ) -> Result<Option<Vec<BlockBody>>, PeerHandlerError> {
603        let block_hashes: Vec<H256> = block_headers.iter().map(|h| h.hash()).collect();
604
605        for _ in 0..REQUEST_RETRY_ATTEMPTS {
606            let Some((block_bodies, peer_id)) =
607                self.request_block_bodies_inner(&block_hashes).await?
608            else {
609                continue; // Retry on empty response
610            };
611            let mut res = Vec::new();
612            let mut validation_success = true;
613            for (header, body) in block_headers[..block_bodies.len()].iter().zip(block_bodies) {
614                if let Err(e) = validate_block_body(header, &body, &NativeCrypto) {
615                    debug!("Invalid block body error {e}, discarding peer {peer_id} and retrying");
616                    validation_success = false;
617                    self.peer_table.record_critical_failure(peer_id)?;
618                    break;
619                }
620                res.push(body);
621            }
622            // Retry on validation failure
623            if validation_success {
624                return Ok(Some(res));
625            }
626        }
627        Ok(None)
628    }
629
630    /// Requests block access lists from a peer that supports eth/71.
631    /// Returns a vector of optional BALs (one per requested block hash) or None if:
632    /// - There are no available eth/71 peers
633    /// - The peer did not respond in time
634    pub async fn request_block_access_lists(
635        &mut self,
636        block_hashes: &[H256],
637    ) -> Result<Option<Vec<Option<BlockAccessList>>>, PeerHandlerError> {
638        let request_id = rand::random();
639        let request = RLPxMessage::GetBlockAccessLists(GetBlockAccessLists {
640            id: request_id,
641            block_hashes: block_hashes.to_vec(),
642        });
643        match self.get_random_peer(&[Capability::eth(71)]).await? {
644            None => Ok(None),
645            Some((peer_id, mut connection, permit)) => {
646                let response = connection
647                    .outgoing_request(request, PEER_REPLY_TIMEOUT)
648                    .await;
649                drop(permit);
650                match response {
651                    Ok(RLPxMessage::BlockAccessLists(BlockAccessLists {
652                        id,
653                        block_access_lists,
654                    })) if id == request_id => {
655                        self.peer_table.record_success(peer_id)?;
656                        Ok(Some(block_access_lists))
657                    }
658                    _ => {
659                        debug!("Didn't receive block access lists from peer {peer_id}");
660                        self.peer_table.record_failure(peer_id)?;
661                        Ok(None)
662                    }
663                }
664            }
665        }
666    }
667
668    /// Returns diagnostic snapshots for all connected peers (scores, requests, eligibility).
669    pub async fn read_peer_diagnostics(&self) -> Vec<PeerDiagnostics> {
670        self.peer_table
671            .get_peer_diagnostics()
672            .await
673            .unwrap_or_default()
674    }
675
676    /// Returns the PeerData for each connected Peer
677    pub async fn read_connected_peers(&mut self) -> Vec<PeerData> {
678        self.peer_table
679            .get_peers_data()
680            .await
681            // Proper error handling
682            .unwrap_or(Vec::new())
683    }
684
685    pub async fn count_total_peers(&mut self) -> Result<usize, PeerHandlerError> {
686        Ok(self.peer_table.peer_count().await?)
687    }
688
689    /// Requests a single block header by number from an already-selected peer.
690    /// Consumes a `RequestPermit` reserved by the caller at peer selection
691    /// time; the permit drops when this function returns, releasing the slot.
692    pub async fn get_block_header(
693        &mut self,
694        connection: &mut PeerConnection,
695        _permit: RequestPermit,
696        block_number: u64,
697    ) -> Result<Option<BlockHeader>, PeerHandlerError> {
698        let request_id = rand::random();
699        let request = RLPxMessage::GetBlockHeaders(GetBlockHeaders {
700            id: request_id,
701            startblock: HashOrNumber::Number(block_number),
702            limit: 1,
703            skip: 0,
704            reverse: false,
705        });
706        debug!("get_block_header: requesting header with number {block_number}");
707        match connection
708            .outgoing_request(request, PEER_REPLY_TIMEOUT)
709            .await
710        {
711            Ok(RLPxMessage::BlockHeaders(BlockHeaders {
712                id: _,
713                block_headers,
714            })) => {
715                if !block_headers.is_empty() {
716                    return Ok(Some(
717                        block_headers
718                            .last()
719                            .ok_or(PeerHandlerError::BlockHeaders)?
720                            .clone(),
721                    ));
722                }
723            }
724            Ok(_other_msgs) => {
725                debug!("Received unexpected message from peer");
726            }
727            Err(PeerConnectionError::Timeout) => {
728                debug!("Timeout while waiting for sync head from peer");
729            }
730            // TODO: we need to check, this seems a scenario where the peer channel does teardown
731            // after we sent the backend message
732            Err(_) => {
733                debug!("Peer connection closed while waiting for response");
734            }
735        }
736
737        Ok(None)
738    }
739}
740
741/// Validates the block headers received from a peer by checking that the parent hash of each header
742/// matches the hash of the previous one, i.e. the headers are chained
743fn are_block_headers_chained(block_headers: &[BlockHeader], order: &BlockRequestOrder) -> bool {
744    block_headers.windows(2).all(|headers| match order {
745        BlockRequestOrder::OldToNew => headers[1].parent_hash == headers[0].hash(),
746        BlockRequestOrder::NewToOld => headers[0].parent_hash == headers[1].hash(),
747    })
748}
749
750fn format_duration(duration: Duration) -> String {
751    let total_seconds = duration.as_secs();
752    let hours = total_seconds / 3600;
753    let minutes = (total_seconds % 3600) / 60;
754    let seconds = total_seconds % 60;
755
756    format!("{hours:02}h {minutes:02}m {seconds:02}s")
757}
758
759#[derive(thiserror::Error, Debug)]
760pub enum PeerHandlerError {
761    #[error("Failed to send message to peer: {0}")]
762    SendMessageToPeer(String),
763    #[error("Failed to receive block headers")]
764    BlockHeaders,
765    #[error("Received unexpected response from peer {0}")]
766    UnexpectedResponseFromPeer(H256),
767    #[error("Received an empty response from peer {0}")]
768    EmptyResponseFromPeer(H256),
769    #[error("Failed to receive message from peer {0}")]
770    ReceiveMessageFromPeer(H256),
771    #[error("Timeout while waiting for message from peer {0}")]
772    ReceiveMessageFromPeerTimeout(H256),
773    #[error("Received invalid headers")]
774    InvalidHeaders,
775    #[error("Storage Full")]
776    StorageFull,
777    #[error("No response from peer")]
778    NoResponseFromPeer,
779    #[error("Error in Peer Table: {0}")]
780    PeerTableError(#[from] ActorError),
781    #[error("Snap error: {0}")]
782    Snap(#[from] SnapError),
783}
784
785impl PeerHandlerError {
786    /// Transient errors caused by individual peer interactions (bad/slow/absent
787    /// responses) or actor-request timeouts that should trigger a retry.
788    /// Storage/snap failures and stopped actors indicate a more fundamental
789    /// problem and should be surfaced as fatal.
790    pub fn is_recoverable(&self) -> bool {
791        match self {
792            PeerHandlerError::SendMessageToPeer(_)
793            | PeerHandlerError::BlockHeaders
794            | PeerHandlerError::UnexpectedResponseFromPeer(_)
795            | PeerHandlerError::EmptyResponseFromPeer(_)
796            | PeerHandlerError::ReceiveMessageFromPeer(_)
797            | PeerHandlerError::ReceiveMessageFromPeerTimeout(_)
798            | PeerHandlerError::InvalidHeaders
799            | PeerHandlerError::NoResponseFromPeer => true,
800            // A timed-out actor request is transient (mailbox pressure or a
801            // slow handler — requests use spawned-concurrency's 5s default
802            // timeout); a stopped actor means p2p is shutting down and must
803            // stay fatal.
804            PeerHandlerError::PeerTableError(ActorError::RequestTimeout) => true,
805            PeerHandlerError::PeerTableError(ActorError::ActorStopped) => false,
806            PeerHandlerError::StorageFull | PeerHandlerError::Snap(_) => false,
807        }
808    }
809}