1use crate::rlpx::initiator::RLPxInitiator;
2use crate::{
3 metrics::{CurrentStepValue, METRICS},
4 peer_table::{
5 PeerData, PeerDiagnostics, PeerTable, PeerTableServerProtocol as _, RequestPermit,
6 },
7 rlpx::{
8 connection::server::PeerConnection,
9 error::PeerConnectionError,
10 eth::{
11 block_access_lists::{BlockAccessLists, GetBlockAccessLists},
12 blocks::{
13 BLOCK_HEADER_LIMIT, BlockBodies, BlockHeaders, GetBlockBodies, GetBlockHeaders,
14 HashOrNumber,
15 },
16 },
17 message::Message as RLPxMessage,
18 p2p::{Capability, SUPPORTED_ETH_CAPABILITIES},
19 },
20};
21use ethrex_common::{
22 H256,
23 types::{BlockBody, BlockHeader, block_access_list::BlockAccessList, validate_block_body},
24};
25use ethrex_crypto::NativeCrypto;
26use spawned_concurrency::{error::ActorError, tasks::ActorRef};
27use std::{
28 collections::{HashSet, VecDeque},
29 sync::atomic::Ordering,
30 time::{Duration, SystemTime},
31};
32use tracing::{debug, error, trace, warn};
33
34pub use crate::snap::constants::{
36 HASH_MAX, MAX_BLOCK_BODIES_TO_REQUEST, MAX_HEADER_CHUNK, MAX_RESPONSE_BYTES,
37 PEER_REPLY_TIMEOUT, PEER_SELECT_RETRY_ATTEMPTS, RANGE_FILE_CHUNK_SIZE, REQUEST_RETRY_ATTEMPTS,
38 SNAP_LIMIT,
39};
40
41pub use crate::snap::{DumpError, RequestMetadata, RequestStorageTrieNodesError, SnapError};
43
44#[derive(Debug, Clone)]
46pub struct PeerHandler {
47 pub peer_table: PeerTable,
48 pub initiator: ActorRef<RLPxInitiator>,
49}
50
51pub enum BlockRequestOrder {
52 OldToNew,
53 NewToOld,
54}
55
56#[derive(Debug)]
59pub enum HeaderFetchOutcome {
60 Headers(Vec<BlockHeader>),
62 NoPeerAvailable,
65 PeerFailed,
67}
68
69impl HeaderFetchOutcome {
70 pub fn failure_reason(&self) -> &'static str {
72 match self {
73 HeaderFetchOutcome::Headers(_) => "headers received",
74 HeaderFetchOutcome::NoPeerAvailable => {
75 "no eth-capable peer with a live connection to query (peers may be connecting or recently dropped)"
76 }
77 HeaderFetchOutcome::PeerFailed => "peer(s) queried but did not serve headers",
78 }
79 }
80}
81
82async fn ask_peer_head_number(
85 peer_id: H256,
86 connection: &mut PeerConnection,
87 _permit: RequestPermit,
88 sync_head: H256,
89 retries: i32,
90) -> Result<u64, PeerHandlerError> {
91 trace!("Sync Log 11: Requesting sync head block number from peer {peer_id}");
93 let request_id = rand::random();
94 let request = RLPxMessage::GetBlockHeaders(GetBlockHeaders {
95 id: request_id,
96 startblock: HashOrNumber::Hash(sync_head),
97 limit: 1,
98 skip: 0,
99 reverse: false,
100 });
101
102 debug!("(Retry {retries}) Requesting sync head {sync_head:?} to peer {peer_id}");
103
104 match connection
105 .outgoing_request(request, PEER_REPLY_TIMEOUT)
106 .await
107 {
108 Ok(RLPxMessage::BlockHeaders(BlockHeaders {
109 id: _,
110 block_headers,
111 })) => {
112 if !block_headers.is_empty() {
113 let sync_head_number = block_headers
114 .last()
115 .ok_or(PeerHandlerError::BlockHeaders)?
116 .number;
117 trace!(
118 "Sync Log 12: Received sync head block headers from peer {peer_id}, sync head number {sync_head_number}"
119 );
120 Ok(sync_head_number)
121 } else {
122 Err(PeerHandlerError::EmptyResponseFromPeer(peer_id))
123 }
124 }
125 Ok(_other_msgs) => Err(PeerHandlerError::UnexpectedResponseFromPeer(peer_id)),
126 Err(PeerConnectionError::Timeout) => {
127 Err(PeerHandlerError::ReceiveMessageFromPeerTimeout(peer_id))
128 }
129 Err(_other_err) => Err(PeerHandlerError::ReceiveMessageFromPeer(peer_id)),
130 }
131}
132
133impl PeerHandler {
134 pub fn new(peer_table: PeerTable, initiator: ActorRef<RLPxInitiator>) -> PeerHandler {
135 Self {
136 peer_table,
137 initiator,
138 }
139 }
140
141 async fn get_random_peer(
144 &mut self,
145 capabilities: &[Capability],
146 ) -> Result<Option<(H256, PeerConnection, RequestPermit)>, PeerHandlerError> {
147 Ok(self
148 .peer_table
149 .get_random_peer(capabilities.to_vec())
150 .await?)
151 }
152
153 pub async fn eth_capable_peer_count(&self) -> usize {
159 self.peer_table
160 .peer_count_by_capabilities(SUPPORTED_ETH_CAPABILITIES.to_vec())
161 .await
162 .unwrap_or(0)
163 }
164
165 pub async fn request_block_headers(
170 &mut self,
171 start: u64,
172 sync_head: H256,
173 ) -> Result<Option<Vec<BlockHeader>>, PeerHandlerError> {
174 let start_time = SystemTime::now();
175 METRICS
176 .current_step
177 .set(CurrentStepValue::DownloadingHeaders);
178
179 let mut ret = Vec::<BlockHeader>::new();
180
181 let mut sync_head_number = 0_u64;
182
183 let sync_head_number_retrieval_start = SystemTime::now();
184
185 debug!("Retrieving sync head block number from peers");
186
187 let mut retries = 1;
188
189 const MAX_PEERS_TO_ASK: usize = 5;
192 const MAX_RETRIES: i32 = 3;
193
194 while sync_head_number == 0 {
195 if retries > MAX_RETRIES {
196 return Ok(None);
198 }
199 let peers = self
200 .peer_table
201 .get_best_n_peers(SUPPORTED_ETH_CAPABILITIES.to_vec(), MAX_PEERS_TO_ASK)
202 .await?;
203
204 let selected_peers: Vec<_> = peers.iter().map(|(id, _, _)| *id).collect();
205 debug!(
206 retry = retries,
207 peers_selected = ?selected_peers,
208 "request_block_headers: resolving sync head with peers"
209 );
210 for (peer_id, mut connection, permit) in peers {
211 match ask_peer_head_number(peer_id, &mut connection, permit, sync_head, retries)
212 .await
213 {
214 Ok(number) => {
215 sync_head_number = number;
216 if number != 0 {
217 #[cfg(feature = "metrics")]
218 ethrex_metrics::sync::METRICS_SYNC.inc_header_resolution("found");
219 break;
220 }
221 #[cfg(feature = "metrics")]
222 ethrex_metrics::sync::METRICS_SYNC.inc_header_resolution("unknown");
223 }
224 Err(err) => {
225 #[cfg(feature = "metrics")]
226 ethrex_metrics::sync::METRICS_SYNC.inc_header_resolution("timeout");
227 debug!(
228 "Sync Log 13: Failed to retrieve sync head block number from peer {peer_id}: {err}"
229 );
230 }
231 }
232 }
233
234 retries += 1;
235 }
236 METRICS
237 .sync_head_block
238 .store(sync_head_number, Ordering::Relaxed);
239 sync_head_number = sync_head_number.min(start + MAX_HEADER_CHUNK);
240
241 let sync_head_number_retrieval_elapsed = sync_head_number_retrieval_start
242 .elapsed()
243 .unwrap_or_default();
244
245 debug!("Sync head block number retrieved");
246
247 *METRICS.time_to_retrieve_sync_head_block.lock().await =
248 Some(sync_head_number_retrieval_elapsed);
249 *METRICS.sync_head_hash.lock().await = sync_head;
250
251 let block_count = sync_head_number + 1 - start;
252 let chunk_count = if block_count < 800_u64 { 1 } else { 800_u64 };
253
254 let chunk_limit = block_count / chunk_count;
256
257 let mut tasks_queue_not_started = VecDeque::<(u64, u64)>::new();
259
260 for i in 0..chunk_count {
261 tasks_queue_not_started.push_back((i * chunk_limit + start, chunk_limit));
262 }
263
264 if !block_count.is_multiple_of(chunk_count) {
266 tasks_queue_not_started
267 .push_back((chunk_count * chunk_limit + start, block_count % chunk_count));
268 }
269
270 let mut downloaded_count = 0_u64;
271
272 let (task_sender, mut task_receiver) =
274 tokio::sync::mpsc::channel::<(Vec<BlockHeader>, H256, PeerConnection, u64, u64)>(1000);
275
276 let mut current_show = 0;
277
278 debug!("Starting to download block headers from peers");
281
282 *METRICS.headers_download_start_time.lock().await = Some(SystemTime::now());
283
284 let mut logged_no_free_peers_count = 0;
285
286 loop {
287 if let Ok((headers, peer_id, _connection, startblock, previous_chunk_limit)) =
288 task_receiver.try_recv()
289 {
290 trace!("We received a download chunk from peer");
291 if headers.is_empty() {
292 self.peer_table.record_failure(peer_id)?;
293
294 debug!("Failed to download chunk from peer. Downloader {peer_id} freed");
295
296 tasks_queue_not_started.push_back((startblock, previous_chunk_limit));
298
299 continue; }
301
302 downloaded_count += headers.len() as u64;
303
304 METRICS.downloaded_headers.inc_by(headers.len() as u64);
305
306 let batch_show = downloaded_count / 10_000;
307
308 if current_show < batch_show {
309 debug!(
310 "Downloaded {} headers from peer {} (current count: {downloaded_count})",
311 headers.len(),
312 peer_id
313 );
314 current_show += 1;
315 }
316 ret.extend_from_slice(&headers);
318
319 let downloaded_headers = headers.len() as u64;
320
321 if downloaded_headers < previous_chunk_limit {
323 let new_start = startblock + headers.len() as u64;
324
325 let new_chunk_limit = previous_chunk_limit - headers.len() as u64;
326
327 debug!(
328 "Task for ({startblock}, {new_chunk_limit}) was not completed, re-adding to the queue, {new_chunk_limit} remaining headers"
329 );
330
331 tasks_queue_not_started.push_back((new_start, new_chunk_limit));
332 }
333
334 self.peer_table.record_success(peer_id)?;
335 debug!("Downloader {peer_id} freed");
336 }
337 let Some((peer_id, mut connection, permit)) = self
338 .peer_table
339 .get_best_peer(SUPPORTED_ETH_CAPABILITIES.to_vec())
340 .await?
341 else {
342 if logged_no_free_peers_count == 0 {
344 trace!("We are missing peers in request_block_headers");
345 logged_no_free_peers_count = 1000;
346 }
347 logged_no_free_peers_count -= 1;
348 tokio::time::sleep(Duration::from_millis(10)).await;
350 continue;
351 };
352
353 let Some((startblock, chunk_limit)) = tasks_queue_not_started.pop_front() else {
354 if downloaded_count >= block_count {
355 debug!("All headers downloaded successfully");
356 break;
357 }
358
359 let batch_show = downloaded_count / 10_000;
360
361 if current_show < batch_show {
362 current_show += 1;
363 }
364
365 tokio::task::yield_now().await;
369 continue;
370 };
371 let tx = task_sender.clone();
372 debug!("Downloader {peer_id} is now busy");
373
374 tokio::spawn(async move {
375 trace!(
376 "Sync Log 5: Requesting block headers from peer {peer_id}, chunk_limit: {chunk_limit}"
377 );
378 let headers = Self::download_chunk_from_peer(
379 peer_id,
380 &mut connection,
381 permit,
382 startblock,
383 chunk_limit,
384 )
385 .await
386 .inspect_err(|err| trace!("Sync Log 6: {peer_id} failed to download chunk: {err}"))
387 .unwrap_or_default();
388
389 tx.send((headers, peer_id, connection, startblock, chunk_limit))
390 .await
391 .inspect_err(|err| {
392 error!("Failed to send headers result through channel. Error: {err}")
393 })
394 });
395 }
396
397 let elapsed = start_time.elapsed().unwrap_or_default();
398
399 debug!(
400 "Downloaded all headers ({}) in {} seconds",
401 ret.len(),
402 format_duration(elapsed)
403 );
404
405 {
406 let downloaded_headers = ret.len();
407 let unique_headers = ret.iter().map(|h| h.hash()).collect::<HashSet<_>>();
408
409 debug!(
410 "Downloaded {} headers, unique: {}, duplicates: {}",
411 downloaded_headers,
412 unique_headers.len(),
413 downloaded_headers - unique_headers.len()
414 );
415
416 match downloaded_headers.cmp(&unique_headers.len()) {
417 std::cmp::Ordering::Equal => {
418 debug!("All downloaded headers are unique");
419 }
420 std::cmp::Ordering::Greater => {
421 debug!(
422 "Downloaded headers contain duplicates, {} duplicates found",
423 downloaded_headers - unique_headers.len()
424 );
425 }
426 std::cmp::Ordering::Less => {
427 debug!(
428 "Downloaded headers are less than unique headers, this should not happen"
429 );
430 }
431 }
432 }
433
434 ret.sort_by(|x, y| x.number.cmp(&y.number));
435 Ok(Some(ret))
436 }
437
438 pub async fn request_block_headers_from_hash(
442 &mut self,
443 start: H256,
444 order: BlockRequestOrder,
445 ) -> Result<HeaderFetchOutcome, PeerHandlerError> {
446 let request_id = rand::random();
447 let request = RLPxMessage::GetBlockHeaders(GetBlockHeaders {
448 id: request_id,
449 startblock: start.into(),
450 limit: BLOCK_HEADER_LIMIT,
451 skip: 0,
452 reverse: matches!(order, BlockRequestOrder::NewToOld),
453 });
454 match self.get_random_peer(&SUPPORTED_ETH_CAPABILITIES).await? {
455 None => Ok(HeaderFetchOutcome::NoPeerAvailable),
456 Some((peer_id, mut connection, permit)) => {
457 let response = connection
458 .outgoing_request(request, PEER_REPLY_TIMEOUT)
459 .await;
460 drop(permit);
461 if let Ok(RLPxMessage::BlockHeaders(BlockHeaders {
462 id: _,
463 block_headers,
464 })) = response
465 {
466 if block_headers.is_empty() {
467 debug!(
474 "[SYNCING] Received empty headers from peer {peer_id}, trying another"
475 );
476 self.peer_table.record_failure(peer_id)?;
477 return Ok(HeaderFetchOutcome::PeerFailed);
478 }
479 if are_block_headers_chained(&block_headers, &order) {
480 if block_headers[0].hash() != start {
491 warn!(
492 "[SYNCING] Peer {peer_id} returned headers not starting at the requested hash {start:#x}, penalizing peer"
493 );
494 self.peer_table.record_failure(peer_id)?;
495 return Ok(HeaderFetchOutcome::PeerFailed);
496 }
497 self.peer_table.record_success(peer_id)?;
498 return Ok(HeaderFetchOutcome::Headers(block_headers));
499 }
500 debug!(
502 "Received invalid (unchained) headers from peer, penalizing peer {peer_id}"
503 );
504 self.peer_table.record_failure(peer_id)?;
505 return Ok(HeaderFetchOutcome::PeerFailed);
506 }
507 debug!("Didn't receive block headers from peer, penalizing peer {peer_id}");
509 self.peer_table.record_failure(peer_id)?;
510 Ok(HeaderFetchOutcome::PeerFailed)
511 }
512 }
513 }
514
515 async fn download_chunk_from_peer(
519 peer_id: H256,
520 connection: &mut PeerConnection,
521 permit: RequestPermit,
522 startblock: u64,
523 chunk_limit: u64,
524 ) -> Result<Vec<BlockHeader>, PeerHandlerError> {
525 debug!("Requesting block headers from peer {peer_id}");
526 let request_id = rand::random();
527 let request = RLPxMessage::GetBlockHeaders(GetBlockHeaders {
528 id: request_id,
529 startblock: HashOrNumber::Number(startblock),
530 limit: chunk_limit,
531 skip: 0,
532 reverse: false,
533 });
534 let response = connection
535 .outgoing_request(request, PEER_REPLY_TIMEOUT)
536 .await;
537 drop(permit);
538 if let Ok(RLPxMessage::BlockHeaders(BlockHeaders {
539 id: _,
540 block_headers,
541 })) = response
542 {
543 if are_block_headers_chained(&block_headers, &BlockRequestOrder::OldToNew) {
544 Ok(block_headers)
545 } else {
546 debug!("Received invalid headers from peer: {peer_id}");
547 Err(PeerHandlerError::InvalidHeaders)
548 }
549 } else {
550 Err(PeerHandlerError::BlockHeaders)
551 }
552 }
553
554 async fn request_block_bodies_inner(
559 &mut self,
560 block_hashes: &[H256],
561 ) -> Result<Option<(Vec<BlockBody>, H256)>, PeerHandlerError> {
562 let block_hashes_len = block_hashes.len();
563 let request_id = rand::random();
564 let request = RLPxMessage::GetBlockBodies(GetBlockBodies {
565 id: request_id,
566 block_hashes: block_hashes.to_vec(),
567 });
568 match self.get_random_peer(&SUPPORTED_ETH_CAPABILITIES).await? {
569 None => Ok(None),
570 Some((peer_id, mut connection, permit)) => {
571 let response = connection
572 .outgoing_request(request, PEER_REPLY_TIMEOUT)
573 .await;
574 drop(permit);
575 if let Ok(RLPxMessage::BlockBodies(BlockBodies {
576 id: _,
577 block_bodies,
578 })) = response
579 {
580 if !block_bodies.is_empty() && block_bodies.len() <= block_hashes_len {
582 self.peer_table.record_success(peer_id)?;
583 return Ok(Some((block_bodies, peer_id)));
584 }
585 }
586 debug!("Didn't receive block bodies from peer, penalizing peer {peer_id}");
587 self.peer_table.record_failure(peer_id)?;
588 let _ = self.peer_table.set_disposable(peer_id);
589 Ok(None)
590 }
591 }
592 }
593
594 pub async fn request_block_bodies(
600 &mut self,
601 block_headers: &[BlockHeader],
602 ) -> Result<Option<Vec<BlockBody>>, PeerHandlerError> {
603 let block_hashes: Vec<H256> = block_headers.iter().map(|h| h.hash()).collect();
604
605 for _ in 0..REQUEST_RETRY_ATTEMPTS {
606 let Some((block_bodies, peer_id)) =
607 self.request_block_bodies_inner(&block_hashes).await?
608 else {
609 continue; };
611 let mut res = Vec::new();
612 let mut validation_success = true;
613 for (header, body) in block_headers[..block_bodies.len()].iter().zip(block_bodies) {
614 if let Err(e) = validate_block_body(header, &body, &NativeCrypto) {
615 debug!("Invalid block body error {e}, discarding peer {peer_id} and retrying");
616 validation_success = false;
617 self.peer_table.record_critical_failure(peer_id)?;
618 break;
619 }
620 res.push(body);
621 }
622 if validation_success {
624 return Ok(Some(res));
625 }
626 }
627 Ok(None)
628 }
629
630 pub async fn request_block_access_lists(
635 &mut self,
636 block_hashes: &[H256],
637 ) -> Result<Option<Vec<Option<BlockAccessList>>>, PeerHandlerError> {
638 let request_id = rand::random();
639 let request = RLPxMessage::GetBlockAccessLists(GetBlockAccessLists {
640 id: request_id,
641 block_hashes: block_hashes.to_vec(),
642 });
643 match self.get_random_peer(&[Capability::eth(71)]).await? {
644 None => Ok(None),
645 Some((peer_id, mut connection, permit)) => {
646 let response = connection
647 .outgoing_request(request, PEER_REPLY_TIMEOUT)
648 .await;
649 drop(permit);
650 match response {
651 Ok(RLPxMessage::BlockAccessLists(BlockAccessLists {
652 id,
653 block_access_lists,
654 })) if id == request_id => {
655 self.peer_table.record_success(peer_id)?;
656 Ok(Some(block_access_lists))
657 }
658 _ => {
659 debug!("Didn't receive block access lists from peer {peer_id}");
660 self.peer_table.record_failure(peer_id)?;
661 Ok(None)
662 }
663 }
664 }
665 }
666 }
667
668 pub async fn read_peer_diagnostics(&self) -> Vec<PeerDiagnostics> {
670 self.peer_table
671 .get_peer_diagnostics()
672 .await
673 .unwrap_or_default()
674 }
675
676 pub async fn read_connected_peers(&mut self) -> Vec<PeerData> {
678 self.peer_table
679 .get_peers_data()
680 .await
681 .unwrap_or(Vec::new())
683 }
684
685 pub async fn count_total_peers(&mut self) -> Result<usize, PeerHandlerError> {
686 Ok(self.peer_table.peer_count().await?)
687 }
688
689 pub async fn get_block_header(
693 &mut self,
694 connection: &mut PeerConnection,
695 _permit: RequestPermit,
696 block_number: u64,
697 ) -> Result<Option<BlockHeader>, PeerHandlerError> {
698 let request_id = rand::random();
699 let request = RLPxMessage::GetBlockHeaders(GetBlockHeaders {
700 id: request_id,
701 startblock: HashOrNumber::Number(block_number),
702 limit: 1,
703 skip: 0,
704 reverse: false,
705 });
706 debug!("get_block_header: requesting header with number {block_number}");
707 match connection
708 .outgoing_request(request, PEER_REPLY_TIMEOUT)
709 .await
710 {
711 Ok(RLPxMessage::BlockHeaders(BlockHeaders {
712 id: _,
713 block_headers,
714 })) => {
715 if !block_headers.is_empty() {
716 return Ok(Some(
717 block_headers
718 .last()
719 .ok_or(PeerHandlerError::BlockHeaders)?
720 .clone(),
721 ));
722 }
723 }
724 Ok(_other_msgs) => {
725 debug!("Received unexpected message from peer");
726 }
727 Err(PeerConnectionError::Timeout) => {
728 debug!("Timeout while waiting for sync head from peer");
729 }
730 Err(_) => {
733 debug!("Peer connection closed while waiting for response");
734 }
735 }
736
737 Ok(None)
738 }
739}
740
741fn are_block_headers_chained(block_headers: &[BlockHeader], order: &BlockRequestOrder) -> bool {
744 block_headers.windows(2).all(|headers| match order {
745 BlockRequestOrder::OldToNew => headers[1].parent_hash == headers[0].hash(),
746 BlockRequestOrder::NewToOld => headers[0].parent_hash == headers[1].hash(),
747 })
748}
749
750fn format_duration(duration: Duration) -> String {
751 let total_seconds = duration.as_secs();
752 let hours = total_seconds / 3600;
753 let minutes = (total_seconds % 3600) / 60;
754 let seconds = total_seconds % 60;
755
756 format!("{hours:02}h {minutes:02}m {seconds:02}s")
757}
758
759#[derive(thiserror::Error, Debug)]
760pub enum PeerHandlerError {
761 #[error("Failed to send message to peer: {0}")]
762 SendMessageToPeer(String),
763 #[error("Failed to receive block headers")]
764 BlockHeaders,
765 #[error("Received unexpected response from peer {0}")]
766 UnexpectedResponseFromPeer(H256),
767 #[error("Received an empty response from peer {0}")]
768 EmptyResponseFromPeer(H256),
769 #[error("Failed to receive message from peer {0}")]
770 ReceiveMessageFromPeer(H256),
771 #[error("Timeout while waiting for message from peer {0}")]
772 ReceiveMessageFromPeerTimeout(H256),
773 #[error("Received invalid headers")]
774 InvalidHeaders,
775 #[error("Storage Full")]
776 StorageFull,
777 #[error("No response from peer")]
778 NoResponseFromPeer,
779 #[error("Error in Peer Table: {0}")]
780 PeerTableError(#[from] ActorError),
781 #[error("Snap error: {0}")]
782 Snap(#[from] SnapError),
783}
784
785impl PeerHandlerError {
786 pub fn is_recoverable(&self) -> bool {
791 match self {
792 PeerHandlerError::SendMessageToPeer(_)
793 | PeerHandlerError::BlockHeaders
794 | PeerHandlerError::UnexpectedResponseFromPeer(_)
795 | PeerHandlerError::EmptyResponseFromPeer(_)
796 | PeerHandlerError::ReceiveMessageFromPeer(_)
797 | PeerHandlerError::ReceiveMessageFromPeerTimeout(_)
798 | PeerHandlerError::InvalidHeaders
799 | PeerHandlerError::NoResponseFromPeer => true,
800 PeerHandlerError::PeerTableError(ActorError::RequestTimeout) => true,
805 PeerHandlerError::PeerTableError(ActorError::ActorStopped) => false,
806 PeerHandlerError::StorageFull | PeerHandlerError::Snap(_) => false,
807 }
808 }
809}