1#[cfg(feature = "l2")]
2use crate::rlpx::l2::{
3 PERIODIC_BATCH_BROADCAST_INTERVAL, PERIODIC_BLOCK_BROADCAST_INTERVAL,
4 l2_connection::{
5 self, L2Cast, L2ConnState, handle_based_capability_message, handle_l2_broadcast,
6 },
7};
8use crate::{
9 backend,
10 metrics::METRICS,
11 network::P2PContext,
12 peer_table::{PeerTable, PeerTableServerProtocol as _},
13 rlpx::{
14 Message,
15 connection::{codec::RLPxCodec, handshake},
16 error::PeerConnectionError,
17 eth::{
18 block_access_lists::{BlockAccessLists, GetBlockAccessLists},
19 blocks::{BlockBodies, BlockHeaders},
20 receipts::{
21 GetReceipts68, GetReceipts70, Receipts68, Receipts69, Receipts70,
22 SOFT_RESPONSE_LIMIT,
23 },
24 status::{StatusMessage68, StatusMessage69, StatusMessage70, StatusMessage71},
25 transactions::{GetPooledTransactions, NewPooledTransactionHashes},
26 update::BlockRangeUpdate,
27 },
28 message::EthCapVersion,
29 p2p::{
30 self, Capability, DisconnectMessage, DisconnectReason, PingMessage, PongMessage,
31 SUPPORTED_ETH_CAPABILITIES, SUPPORTED_SNAP_CAPABILITIES,
32 },
33 snap::TrieNodes,
34 },
35 snap::{
36 process_account_range_request, process_byte_codes_request, process_storage_ranges_request,
37 process_trie_nodes_request,
38 },
39 tx_broadcaster::{TxBroadcaster, TxBroadcasterProtocol as _, send_tx_hashes},
40 types::Node,
41};
42use ethrex_blockchain::Blockchain;
43use ethrex_common::H256;
44#[cfg(feature = "l2")]
45use ethrex_common::types::Transaction;
46use ethrex_common::types::{MempoolTransaction, P2PTransaction, Receipt};
47use ethrex_rlp::encode::RLPEncode;
48use ethrex_storage::{Store, error::StoreError};
49use ethrex_trie::TrieError;
50use futures::{SinkExt as _, Stream, stream::SplitSink};
51use rand::random;
52use rustc_hash::FxHashMap;
53use secp256k1::{PublicKey, SecretKey};
54use spawned_concurrency::{
55 actor,
56 error::ActorError,
57 protocol,
58 tasks::{Actor, ActorRef, ActorStart as _, Context, Handler, send_interval, spawn_listener},
59};
60use spawned_rt::tasks::BroadcastStream;
61use std::{
62 collections::HashMap,
63 net::SocketAddr,
64 sync::{Arc, RwLock},
65 time::{Duration, Instant},
66};
67use tokio::{
68 net::TcpStream,
69 sync::{broadcast, oneshot},
70 task::{self, Id},
71};
72use tokio_stream::StreamExt;
73use tokio_util::codec::Framed;
74use tracing::{debug, error, trace, warn};
75
76const PING_INTERVAL: Duration = Duration::from_secs(10);
77const BLOCK_RANGE_UPDATE_INTERVAL: Duration = Duration::from_secs(60);
78const INFLIGHT_TX_SWEEP_INTERVAL: Duration = Duration::from_secs(15);
79const INFLIGHT_TX_TIMEOUT: Duration = Duration::from_secs(30);
80const TX_REQUEST_BATCH_INTERVAL: Duration = Duration::from_millis(50);
83const SERVE_REQUEST_WINDOW: Duration = Duration::from_secs(60);
85const MAX_SERVE_REQUESTS_PER_WINDOW: u64 = 500;
87const LEECH_TX_SENT_THRESHOLD: u64 = 10_000;
89
90pub(crate) type PeerConnBroadcastSender = broadcast::Sender<(tokio::task::Id, Arc<Message>)>;
91
92#[protocol]
93pub trait PeerConnectionServerProtocol: Send + Sync {
94 fn incoming_message(&self, message: Message) -> Result<(), ActorError>;
95 fn outgoing_message(&self, message: Message) -> Result<(), ActorError>;
96 fn outgoing_request(
97 &self,
98 message: Message,
99 sender: Arc<oneshot::Sender<Message>>,
100 ) -> Result<(), ActorError>;
101 fn request_timeout(&self, id: u64) -> Result<(), ActorError>;
102 fn send_ping(&self) -> Result<(), ActorError>;
103 fn block_range_update(&self) -> Result<(), ActorError>;
104 fn broadcast_message(&self, task_id: Id, msg: Arc<Message>) -> Result<(), ActorError>;
105 fn sweep_inflight_txs(&self) -> Result<(), ActorError>;
106 fn flush_pending_tx_requests(&self) -> Result<(), ActorError>;
107 fn enqueue_tx_requests(
108 &self,
109 announcement: NewPooledTransactionHashes,
110 hashes: Vec<H256>,
111 ) -> Result<(), ActorError>;
112}
113
114#[cfg(feature = "l2")]
115#[derive(Clone)]
116pub struct L2Message {
117 pub msg: L2Cast,
118}
119
120#[cfg(feature = "l2")]
121impl spawned_concurrency::message::Message for L2Message {
122 type Result = ();
123}
124
125#[derive(Clone, Debug)]
126pub struct PeerConnection {
127 handle: ActorRef<PeerConnectionServer>,
128}
129
130impl PeerConnection {
131 pub fn spawn_as_receiver(
132 context: P2PContext,
133 peer_addr: SocketAddr,
134 stream: TcpStream,
135 ) -> PeerConnection {
136 let state = ConnectionState::Receiver(Receiver {
137 context,
138 peer_addr,
139 stream: Arc::new(stream),
140 });
141 let connection = PeerConnectionServer { state };
142 Self {
143 handle: connection.start(),
144 }
145 }
146
147 pub fn spawn_as_initiator(context: P2PContext, node: &Node) -> PeerConnection {
148 let state = ConnectionState::Initiator(Initiator {
149 context,
150 node: node.clone(),
151 });
152 let connection = PeerConnectionServer { state };
153 Self {
154 handle: connection.start(),
155 }
156 }
157
158 pub async fn outgoing_message(&mut self, message: Message) -> Result<(), PeerConnectionError> {
159 self.handle
160 .outgoing_message(message)
161 .map_err(|err| PeerConnectionError::InternalError(err.to_string()))
162 }
163
164 pub fn enqueue_tx_requests(
168 &self,
169 announcement: NewPooledTransactionHashes,
170 hashes: Vec<H256>,
171 ) -> Result<(), PeerConnectionError> {
172 self.handle
173 .enqueue_tx_requests(announcement, hashes)
174 .map_err(|err| PeerConnectionError::InternalError(err.to_string()))
175 }
176
177 pub async fn outgoing_request(
178 &mut self,
179 message: Message,
180 timeout: Duration,
181 ) -> Result<Message, PeerConnectionError> {
182 let id = message
183 .request_id()
184 .expect("Cannot wait on request without id");
185 let (oneshot_tx, oneshot_rx) = oneshot::channel::<Message>();
186
187 self.handle
188 .outgoing_request(message, Arc::new(oneshot_tx))
189 .map_err(|err| PeerConnectionError::InternalError(err.to_string()))?;
190
191 match tokio::time::timeout(timeout, oneshot_rx).await {
193 Ok(Ok(response)) => Ok(response),
194 Ok(Err(error)) => Err(PeerConnectionError::RecvError(error.to_string())),
195 Err(_timeout) => {
196 self.handle
198 .request_timeout(id)
199 .map_err(|err| PeerConnectionError::InternalError(err.to_string()))?;
200 Err(PeerConnectionError::Timeout)
202 }
203 }
204 }
205}
206
207#[derive(Debug)]
208pub struct Initiator {
209 pub(crate) context: P2PContext,
210 pub(crate) node: Node,
211}
212
213#[derive(Debug)]
214pub struct Receiver {
215 pub(crate) context: P2PContext,
216 pub(crate) peer_addr: SocketAddr,
217 pub(crate) stream: Arc<TcpStream>,
218}
219
220#[derive(Debug)]
221pub struct Established {
222 pub(crate) signer: SecretKey,
223 pub(crate) sink: SplitSink<Framed<TcpStream, RLPxCodec>, Message>,
226 pub(crate) node: Node,
227 pub(crate) storage: Store,
228 pub(crate) blockchain: Arc<Blockchain>,
229 pub(crate) capabilities: Vec<Capability>,
230 pub(crate) negotiated_eth_capability: Option<Capability>,
231 pub(crate) negotiated_snap_capability: Option<Capability>,
232 pub(crate) last_block_range_update_block: u64,
233 pub(crate) requested_pooled_txs: HashMap<u64, (NewPooledTransactionHashes, Vec<H256>, Instant)>,
236 pub(crate) pending_tx_requests: Vec<(NewPooledTransactionHashes, Vec<H256>)>,
239 pub(crate) client_version: String,
240 pub(crate) connection_broadcast_send: PeerConnBroadcastSender,
251 pub(crate) peer_table: PeerTable,
252 #[cfg(feature = "l2")]
253 pub(crate) l2_state: L2ConnState,
254 pub(crate) tx_broadcaster: ActorRef<TxBroadcaster>,
255 pub(crate) current_requests: HashMap<u64, (String, oneshot::Sender<Message>)>,
256 pub(crate) disconnect_reason: Option<DisconnectReason>,
258 pub(crate) is_validated: bool,
260 pub(crate) serve_request_window_start: Instant,
262 pub(crate) serve_requests_in_window: u64,
264 pub(crate) txs_sent_to_peer: u64,
266 pub(crate) received_txs_from_peer: bool,
268}
269
270impl Established {
271 async fn teardown(&mut self) {
272 for (_, (_announced, requested_hashes, _)) in self.requested_pooled_txs.drain() {
278 if let Err(e) = self
279 .blockchain
280 .mempool
281 .clear_in_flight_txs(&requested_hashes)
282 {
283 warn!(error = %e, "Failed to clear in-flight transaction tracking during peer teardown");
284 }
285 retry_on_alternates(&self.blockchain, &self.peer_table, &requested_hashes).await;
286 }
287 for (_announced, pending_hashes) in self.pending_tx_requests.drain(..) {
289 if let Err(e) = self.blockchain.mempool.clear_in_flight_txs(&pending_hashes) {
290 warn!(error = %e, "Failed to clear in-flight transaction tracking during peer teardown");
291 }
292 retry_on_alternates(&self.blockchain, &self.peer_table, &pending_hashes).await;
293 }
294 let _ = self
297 .sink
298 .close()
299 .await
300 .inspect_err(|err| debug!("Could not close the socket: {err}"));
301 }
302}
303
304#[derive(Debug)]
305pub enum ConnectionState {
306 HandshakeFailed,
307 Initiator(Initiator),
308 Receiver(Receiver),
309 Established(Box<Established>),
310}
311
312#[derive(Debug)]
313pub struct PeerConnectionServer {
314 state: ConnectionState,
315}
316
317#[actor(protocol = PeerConnectionServerProtocol)]
318impl PeerConnectionServer {
319 #[started]
320 async fn started(&mut self, ctx: &Context<Self>) {
321 let eth_version = Arc::new(RwLock::new(EthCapVersion::default()));
324 let state = std::mem::replace(&mut self.state, ConnectionState::HandshakeFailed);
326 match handshake::perform(state, eth_version.clone()).await {
327 Ok((mut established_state, stream)) => {
328 trace!(peer=%established_state.node, "Starting RLPx connection");
329 if let Err(reason) =
330 initialize_connection(ctx, &mut established_state, stream, eth_version).await
331 {
332 match &reason {
333 PeerConnectionError::NoMatchingCapabilities
334 | PeerConnectionError::HandshakeError(_) => {
335 if let Err(e) = established_state
336 .peer_table
337 .set_unwanted(established_state.node.node_id())
338 {
339 debug!("Failed to set peer as unwanted: {e}");
340 }
341 }
342 _ => {}
343 }
344 connection_failed(
345 &mut established_state,
346 "Failed to initialize RLPx connection",
347 &reason,
348 )
349 .await;
350
351 METRICS.record_new_rlpx_conn_failure(reason).await;
352
353 self.state = ConnectionState::Established(Box::new(established_state));
354 ctx.stop();
355 } else {
356 METRICS
357 .record_new_rlpx_conn_established(
358 &established_state
359 .node
360 .version
361 .clone()
362 .unwrap_or("Unknown".to_string()),
363 )
364 .await;
365 established_state.is_validated = true;
366 self.state = ConnectionState::Established(Box::new(established_state));
368 }
369 }
370 Err(err) => {
371 debug!("Failed Handshake on RLPx connection {err}");
374 self.state = ConnectionState::HandshakeFailed;
375 ctx.stop();
376 }
377 }
378 }
379
380 #[stopped]
381 async fn stopped(&mut self, _ctx: &Context<Self>) {
382 match std::mem::replace(&mut self.state, ConnectionState::HandshakeFailed) {
383 ConnectionState::Established(mut established_state) => {
384 trace!(peer=%established_state.node, "Closing connection with established peer");
385 if established_state.is_validated {
386 let reason = established_state
388 .disconnect_reason
389 .unwrap_or(DisconnectReason::NetworkError);
390 METRICS
391 .record_new_rlpx_conn_disconnection(
392 &established_state
393 .node
394 .version
395 .clone()
396 .unwrap_or("Unknown".to_string()),
397 reason,
398 )
399 .await;
400 }
401 if let Err(e) = established_state
402 .peer_table
403 .remove_peer(established_state.node.node_id())
404 {
405 debug!("Failed to remove peer from table: {e}");
406 }
407 established_state.teardown().await;
408 }
409 _ => {
410 }
412 };
413 }
414
415 #[send_handler]
416 async fn handle_incoming_message(
417 &mut self,
418 msg: peer_connection_server_protocol::IncomingMessage,
419 ctx: &Context<Self>,
420 ) {
421 if let ConnectionState::Established(ref mut established_state) = self.state {
422 trace!(
423 peer=%established_state.node,
424 message=%msg.message,
425 "Received incoming message",
426 );
427 let result = handle_incoming_message(established_state, msg.message).await;
428 Self::process_cast_error(&self.state, result, ctx);
429 } else {
430 debug!("Connection not yet established");
431 }
432 }
433
434 #[send_handler]
435 async fn handle_outgoing_message(
436 &mut self,
437 msg: peer_connection_server_protocol::OutgoingMessage,
438 ctx: &Context<Self>,
439 ) {
440 if let ConnectionState::Established(ref mut established_state) = self.state {
441 trace!(
442 peer=%established_state.node,
443 message=%msg.message,
444 "Received outgoing request",
445 );
446 let result = handle_outgoing_message(established_state, msg.message).await;
447 Self::process_cast_error(&self.state, result, ctx);
448 } else {
449 debug!("Connection not yet established");
450 }
451 }
452
453 #[send_handler]
454 async fn handle_outgoing_request(
455 &mut self,
456 msg: peer_connection_server_protocol::OutgoingRequest,
457 ctx: &Context<Self>,
458 ) {
459 if let ConnectionState::Established(ref mut established_state) = self.state {
460 trace!(
461 peer=%established_state.node,
462 message=%msg.message,
463 "Received outgoing request",
464 );
465 let Some(sender) = Arc::<oneshot::Sender<Message>>::into_inner(msg.sender) else {
466 debug!("Could not obtain sender channel: Arc has multiple references");
467 return;
468 };
469 let result = handle_outgoing_request(established_state, msg.message, sender).await;
470 Self::process_cast_error(&self.state, result, ctx);
471 } else {
472 debug!("Connection not yet established");
473 }
474 }
475
476 #[send_handler]
477 async fn handle_request_timeout(
478 &mut self,
479 msg: peer_connection_server_protocol::RequestTimeout,
480 _ctx: &Context<Self>,
481 ) {
482 if let ConnectionState::Established(ref mut established_state) = self.state {
483 if let Some((msg_type, _)) = established_state.current_requests.remove(&msg.id) {
485 debug!(
486 peer=%established_state.node,
487 %msg_type,
488 id=%msg.id,
489 "Request timedout",
490 );
491 }
492 } else {
493 debug!("Connection not yet established");
494 }
495 }
496
497 #[send_handler]
498 async fn handle_send_ping(
499 &mut self,
500 _msg: peer_connection_server_protocol::SendPing,
501 ctx: &Context<Self>,
502 ) {
503 if let ConnectionState::Established(ref mut established_state) = self.state {
504 let result = send(established_state, Message::Ping(PingMessage {})).await;
505 Self::process_cast_error(&self.state, result, ctx);
506 } else {
507 debug!("Connection not yet established");
508 }
509 }
510
511 #[send_handler]
512 async fn handle_block_range_update(
513 &mut self,
514 _msg: peer_connection_server_protocol::BlockRangeUpdate,
515 ctx: &Context<Self>,
516 ) {
517 if let ConnectionState::Established(ref mut established_state) = self.state {
518 trace!(
519 peer=%established_state.node,
520 "Block Range Update"
521 );
522 let result = handle_block_range_update(established_state).await;
523 Self::process_cast_error(&self.state, result, ctx);
524 } else {
525 debug!("Connection not yet established");
526 }
527 }
528
529 #[send_handler]
530 async fn handle_sweep_inflight_txs(
531 &mut self,
532 _msg: peer_connection_server_protocol::SweepInflightTxs,
533 _ctx: &Context<Self>,
534 ) {
535 if let ConnectionState::Established(ref mut state) = self.state {
536 let now = Instant::now();
537 let stale_ids: Vec<u64> = state
538 .requested_pooled_txs
539 .iter()
540 .filter(|(_, (_, _, ts))| now.duration_since(*ts) > INFLIGHT_TX_TIMEOUT)
541 .map(|(id, _)| *id)
542 .collect();
543 for id in stale_ids {
544 if let Some((_announced, hashes, _)) = state.requested_pooled_txs.remove(&id) {
545 if let Err(e) = state.blockchain.mempool.clear_in_flight_txs(&hashes) {
548 warn!(error = %e, "Failed to clear in-flight transaction tracking while sweeping stale requests");
549 }
550 retry_on_alternates(&state.blockchain, &state.peer_table, &hashes).await;
551 }
552 }
553 }
554 }
555
556 #[send_handler]
557 async fn handle_flush_pending_tx_requests(
558 &mut self,
559 _msg: peer_connection_server_protocol::FlushPendingTxRequests,
560 ctx: &Context<Self>,
561 ) {
562 if let ConnectionState::Established(ref mut established_state) = self.state {
563 let result = flush_pending_tx_requests(established_state).await;
564 Self::process_cast_error(&self.state, result, ctx);
565 }
566 }
567
568 #[send_handler]
569 async fn handle_enqueue_tx_requests(
570 &mut self,
571 msg: peer_connection_server_protocol::EnqueueTxRequests,
572 _ctx: &Context<Self>,
573 ) {
574 if let ConnectionState::Established(ref mut state) = self.state {
575 let to_request: Vec<H256> = match state.blockchain.mempool.reserve_unknown_hashes(
578 &msg.announcement.transaction_hashes,
579 &msg.announcement.transaction_types,
580 &msg.announcement.transaction_sizes,
581 state.node.node_id(),
582 ) {
583 Ok(unknown) => unknown,
584 Err(_) => return,
585 };
586 if to_request.is_empty() {
587 return;
588 }
589 let trimmed = msg.announcement.filter_to(&to_request);
590 state.pending_tx_requests.push((trimmed, to_request));
591 }
592 }
593
594 #[send_handler]
595 async fn handle_broadcast_message(
596 &mut self,
597 msg: peer_connection_server_protocol::BroadcastMessage,
598 ctx: &Context<Self>,
599 ) {
600 if let ConnectionState::Established(ref mut established_state) = self.state {
601 trace!(
602 peer=%established_state.node,
603 message=%msg.msg,
604 "Received broadcasted message",
605 );
606 let result = handle_broadcast(established_state, (msg.task_id, msg.msg)).await;
607 Self::process_cast_error(&self.state, result, ctx);
608 } else {
609 debug!("Connection not yet established");
610 }
611 }
612
613 #[cfg(feature = "l2")]
614 #[send_handler]
615 async fn handle_l2_message(&mut self, msg: L2Message, ctx: &Context<Self>) {
616 if let ConnectionState::Established(ref mut established_state) = self.state {
617 let peer_supports_l2 = established_state.l2_state.connection_state().is_ok();
618 let result = if peer_supports_l2 {
619 trace!(
620 peer=%established_state.node,
621 message=?msg.msg,
622 "Handling cast for L2 msg"
623 );
624 match msg.msg {
625 L2Cast::BatchBroadcast => {
626 let res = l2_connection::send_sealed_batch(established_state).await;
627 res.and(l2_connection::process_batches_on_queue(established_state).await)
628 }
629 L2Cast::BlockBroadcast => {
630 let res = l2_connection::send_new_block(established_state).await;
631 res.and(l2_connection::process_blocks_on_queue(established_state).await)
632 }
633 }
634 } else {
635 Err(PeerConnectionError::MessageNotHandled(
636 "Unknown message or capability not handled".to_string(),
637 ))
638 };
639 Self::process_cast_error(&self.state, result, ctx);
640 } else {
641 debug!("Connection not yet established");
642 }
643 }
644
645 fn process_cast_error(
646 state: &ConnectionState,
647 result: Result<(), PeerConnectionError>,
648 ctx: &Context<Self>,
649 ) {
650 if let Err(e) = result
651 && let ConnectionState::Established(established_state) = state
652 {
653 match e {
654 PeerConnectionError::Disconnected
655 | PeerConnectionError::DisconnectReceived(_)
656 | PeerConnectionError::DisconnectSent(_)
657 | PeerConnectionError::HandshakeError(_)
658 | PeerConnectionError::NoMatchingCapabilities
659 | PeerConnectionError::InvalidPeerId
660 | PeerConnectionError::InvalidMessageLength
661 | PeerConnectionError::StateError(_)
662 | PeerConnectionError::InvalidRecoveryId => {
663 trace!(peer=%established_state.node, error=e.to_string(), "Peer connection error");
664 ctx.stop();
665 }
666 PeerConnectionError::IoError(ref io_e)
667 if io_e.kind() == std::io::ErrorKind::BrokenPipe =>
668 {
669 debug!(peer=%established_state.node, "Broken pipe with peer, disconnected");
672 ctx.stop();
673 }
674 PeerConnectionError::StoreError(StoreError::Trie(TrieError::InconsistentTree(
675 _,
676 ))) => {
677 if established_state.blockchain.is_synced() {
678 error!(
681 peer=%established_state.node,
682 error=%e,
683 "Inconsistent trie while serving peer request; local state may be corrupted",
684 );
685 } else {
686 trace!(
688 peer=%established_state.node,
689 error=%e,
690 "Error handling cast message",
691 );
692 }
693 }
694 _ => {
695 debug!(
697 peer=%established_state.node,
698 capabilities=?established_state.capabilities,
699 error=%e,
700 "Error handling cast message",
701 );
702 }
703 }
704 }
705 }
706}
707
708async fn initialize_connection<S>(
709 ctx: &Context<PeerConnectionServer>,
710 state: &mut Established,
711 mut stream: S,
712 eth_version: Arc<RwLock<EthCapVersion>>,
713) -> Result<(), PeerConnectionError>
714where
715 S: Unpin + Send + Stream<Item = Result<Message, PeerConnectionError>> + 'static,
716{
717 if state.peer_table.target_peers_reached().await? {
718 debug!(peer=%state.node, "Reached target peer connections, discarding.");
719 return Err(PeerConnectionError::TooManyPeers);
720 }
721 exchange_hello_messages(state, &mut stream).await?;
722
723 let version = match &state.negotiated_eth_capability {
725 Some(cap) if cap == &Capability::eth(68) => EthCapVersion::V68,
726 Some(cap) if cap == &Capability::eth(69) => EthCapVersion::V69,
727 Some(cap) if cap == &Capability::eth(70) => EthCapVersion::V70,
728 Some(cap) if cap == &Capability::eth(71) => EthCapVersion::V71,
729 _ => EthCapVersion::default(),
730 };
731 *eth_version
732 .write()
733 .map_err(|err| PeerConnectionError::InternalError(err.to_string()))? = version;
734
735 init_capabilities(state, &mut stream).await?;
736
737 let mut connection = PeerConnection {
738 handle: ctx.actor_ref(),
739 };
740
741 state.peer_table.new_connected_peer(
742 state.node.clone(),
743 connection.clone(),
744 state.capabilities.clone(),
745 )?;
746
747 trace!(peer=%state.node, "Peer connection initialized.");
748
749 send_all_pooled_tx_hashes(state, &mut connection).await?;
751
752 send_interval(
754 PING_INTERVAL,
755 ctx.clone(),
756 peer_connection_server_protocol::SendPing,
757 );
758
759 send_interval(
761 BLOCK_RANGE_UPDATE_INTERVAL,
762 ctx.clone(),
763 peer_connection_server_protocol::BlockRangeUpdate,
764 );
765
766 send_interval(
768 INFLIGHT_TX_SWEEP_INTERVAL,
769 ctx.clone(),
770 peer_connection_server_protocol::SweepInflightTxs,
771 );
772
773 send_interval(
775 TX_REQUEST_BATCH_INTERVAL,
776 ctx.clone(),
777 peer_connection_server_protocol::FlushPendingTxRequests,
778 );
779
780 #[cfg(feature = "l2")]
781 if state.l2_state.connection_state().is_ok() {
783 send_interval(
784 PERIODIC_BLOCK_BROADCAST_INTERVAL,
785 ctx.clone(),
786 L2Message {
787 msg: L2Cast::BlockBroadcast,
788 },
789 );
790 send_interval(
791 PERIODIC_BATCH_BROADCAST_INTERVAL,
792 ctx.clone(),
793 L2Message {
794 msg: L2Cast::BatchBroadcast,
795 },
796 );
797 }
798
799 spawn_listener(
800 ctx.clone(),
801 stream.filter_map(|result| match result {
802 Ok(msg) => Some(peer_connection_server_protocol::IncomingMessage { message: msg }),
803 Err(e) => {
804 debug!(error=?e, "Error receiving RLPx message");
805 None
807 }
808 }),
809 );
810
811 if state.negotiated_eth_capability.is_some() {
812 let stream: BroadcastStream<(Id, Arc<Message>)> =
813 BroadcastStream::new(state.connection_broadcast_send.subscribe());
814 let message_stream = stream.filter_map(|result| {
815 result.ok().map(
816 |(id, msg)| peer_connection_server_protocol::BroadcastMessage { task_id: id, msg },
817 )
818 });
819 spawn_listener(ctx.clone(), message_stream);
820 }
821
822 Ok(())
823}
824
825async fn send_all_pooled_tx_hashes(
826 state: &mut Established,
827 connection: &mut PeerConnection,
828) -> Result<(), PeerConnectionError> {
829 let txs: Vec<MempoolTransaction> = state
830 .blockchain
831 .mempool
832 .get_all_txs_by_sender()?
833 .into_values()
834 .flatten()
835 .filter(|tx| !tx.is_privileged())
836 .collect();
837 if !txs.is_empty() {
838 state
839 .tx_broadcaster
840 .add_txs(
841 txs.iter().map(|tx| tx.hash()).collect(),
842 state.node.node_id(),
843 )
844 .map_err(|e| PeerConnectionError::BroadcastError(e.to_string()))?;
845 send_tx_hashes(
846 txs,
847 state.capabilities.clone(),
848 connection,
849 state.node.node_id(),
850 &state.blockchain,
851 )
852 .await
853 .map_err(|e| PeerConnectionError::SendMessage(e.to_string()))?;
854 }
855 Ok(())
856}
857
858async fn send_block_range_update(state: &mut Established) -> Result<(), PeerConnectionError> {
859 if state
861 .negotiated_eth_capability
862 .as_ref()
863 .is_some_and(|eth| eth.version >= 69)
864 {
865 trace!(peer=%state.node, "Sending BlockRangeUpdate");
866 let update = BlockRangeUpdate::new(&state.storage).await?;
867 let lastet_block = update.latest_block;
868 send(state, Message::BlockRangeUpdate(update)).await?;
869 state.last_block_range_update_block = lastet_block - (lastet_block % 32);
870 }
871 Ok(())
872}
873
874async fn should_send_block_range_update(state: &Established) -> Result<bool, PeerConnectionError> {
875 let latest_block = state.storage.get_latest_block_number().await?;
876 if latest_block < state.last_block_range_update_block
877 || latest_block - state.last_block_range_update_block >= 32
878 {
879 return Ok(true);
880 }
881 Ok(false)
882}
883
884async fn init_capabilities<S>(
885 state: &mut Established,
886 stream: &mut S,
887) -> Result<(), PeerConnectionError>
888where
889 S: Unpin + Stream<Item = Result<Message, PeerConnectionError>>,
890{
891 if let Some(eth) = state.negotiated_eth_capability.clone() {
893 let status = match eth.version {
894 68 => Message::Status68(StatusMessage68::new(&state.storage).await?),
895 69 => Message::Status69(StatusMessage69::new(&state.storage).await?),
896 70 => Message::Status70(StatusMessage70::new(&state.storage).await?),
897 71 => Message::Status71(StatusMessage71::new(&state.storage).await?),
898 ver => {
899 return Err(PeerConnectionError::HandshakeError(format!(
900 "Invalid eth version {ver}"
901 )));
902 }
903 };
904 trace!(peer=%state.node, "Sending status");
905 send(state, status).await?;
906 let msg = match receive(stream).await {
910 Some(msg) => msg?,
911 None => return Err(PeerConnectionError::Disconnected),
912 };
913 match msg {
914 Message::Status68(msg_data) => {
915 trace!(peer=%state.node, "Received Status(68)");
916 backend::validate_status(msg_data, &state.storage, ð).await?
917 }
918 Message::Status69(msg_data) => {
919 trace!(peer=%state.node, "Received Status(69)");
920 backend::validate_status(msg_data, &state.storage, ð).await?
921 }
922 Message::Status70(msg_data) => {
923 trace!(peer=%state.node, "Received Status(70)");
924 backend::validate_status(msg_data, &state.storage, ð).await?
925 }
926 Message::Status71(msg_data) => {
927 trace!(peer=%state.node, "Received Status(71)");
928 backend::validate_status(msg_data, &state.storage, ð).await?
929 }
930 Message::Disconnect(disconnect) => {
931 return Err(PeerConnectionError::HandshakeError(format!(
932 "Peer disconnected due to: {}",
933 disconnect.reason()
934 )));
935 }
936 _ => {
937 return Err(PeerConnectionError::HandshakeError(
938 "Expected a Status message".to_string(),
939 ));
940 }
941 }
942 }
943 Ok(())
944}
945
946async fn send_disconnect_message(state: &mut Established, reason: Option<DisconnectReason>) {
947 send(state, Message::Disconnect(DisconnectMessage { reason }))
948 .await
949 .unwrap_or_else(|_| {
950 debug!(
951 peer=%state.node,
952 ?reason,
953 "Could not send Disconnect message",
954 );
955 });
956}
957
958async fn connection_failed(state: &mut Established, error_text: &str, error: &PeerConnectionError) {
959 debug!(
960 peer=%state.node,
961 %error_text,
962 %error,
963 "connection failure"
964 );
965
966 if !matches!(error, PeerConnectionError::DisconnectReceived(_)) {
969 send_disconnect_message(state, match_disconnect_reason(error)).await;
970 }
971
972 match error {
974 PeerConnectionError::DisconnectReceived(DisconnectReason::AlreadyConnected)
976 | PeerConnectionError::DisconnectSent(DisconnectReason::AlreadyConnected) => {
977 debug!(
978 peer=%state.node,
979 %error_text,
980 %error,
981 "Peer already connected, don't replace it"
982 );
983 }
984 _ => {
985 debug!(
986 peer=%state.node,
987 %error_text,
988 %error,
989 remote_public_key=%state.node.public_key,
990 "discarding peer",
991 );
992 }
993 }
994}
995
996fn match_disconnect_reason(error: &PeerConnectionError) -> Option<DisconnectReason> {
997 match error {
998 PeerConnectionError::DisconnectSent(reason) => Some(*reason),
999 PeerConnectionError::DisconnectReceived(reason) => Some(*reason),
1000 PeerConnectionError::RLPDecodeError(_) => Some(DisconnectReason::NetworkError),
1001 PeerConnectionError::TooManyPeers => Some(DisconnectReason::TooManyPeers),
1002 _ => None,
1004 }
1005}
1006
1007async fn exchange_hello_messages<S>(
1008 state: &mut Established,
1009 stream: &mut S,
1010) -> Result<(), PeerConnectionError>
1011where
1012 S: Unpin + Stream<Item = Result<Message, PeerConnectionError>>,
1013{
1014 #[allow(unused_mut)]
1017 let mut supported_capabilities: Vec<Capability> = [
1018 &SUPPORTED_ETH_CAPABILITIES[..],
1019 &SUPPORTED_SNAP_CAPABILITIES[..],
1020 ]
1021 .concat();
1022 #[cfg(feature = "l2")]
1023 if state.l2_state.is_supported() {
1024 supported_capabilities.push(crate::rlpx::l2::SUPPORTED_BASED_CAPABILITIES[0].clone());
1025 }
1026 let hello_msg = Message::Hello(p2p::HelloMessage::new(
1027 supported_capabilities,
1028 PublicKey::from_secret_key(secp256k1::SECP256K1, &state.signer),
1029 state.client_version.clone(),
1030 ));
1031
1032 send(state, hello_msg).await?;
1033
1034 let msg = match receive(stream).await {
1036 Some(msg) => msg?,
1037 None => return Err(PeerConnectionError::Disconnected),
1038 };
1039
1040 match msg {
1041 Message::Hello(hello_message) => {
1042 let mut negotiated_eth_version = 0;
1043 let mut negotiated_snap_version = 0;
1044
1045 trace!(
1046 peer=%state.node,
1047 capabilities=?hello_message.capabilities,
1048 "Hello message capabilities",
1049 );
1050
1051 for cap in &hello_message.capabilities {
1053 match cap.protocol() {
1054 "eth" => {
1055 if SUPPORTED_ETH_CAPABILITIES.contains(cap)
1056 && cap.version > negotiated_eth_version
1057 {
1058 negotiated_eth_version = cap.version;
1059 }
1060 }
1061 "snap" => {
1062 if SUPPORTED_SNAP_CAPABILITIES.contains(cap)
1063 && cap.version > negotiated_snap_version
1064 {
1065 negotiated_snap_version = cap.version;
1066 }
1067 }
1068 #[cfg(feature = "l2")]
1069 "based" if state.l2_state.is_supported() => {
1070 state.l2_state.set_established()?;
1071 }
1072 _ => {}
1073 }
1074 }
1075
1076 state.capabilities = hello_message.capabilities;
1077
1078 if negotiated_eth_version == 0 {
1079 return Err(PeerConnectionError::NoMatchingCapabilities);
1080 }
1081 debug!("Negotiated eth version: eth/{}", negotiated_eth_version);
1082 state.negotiated_eth_capability = Some(Capability::eth(negotiated_eth_version));
1083
1084 if negotiated_snap_version != 0 {
1085 debug!("Negotiated snap version: snap/{}", negotiated_snap_version);
1086 state.negotiated_snap_capability = Some(Capability::snap(negotiated_snap_version));
1087 }
1088
1089 state.node.version = Some(hello_message.client_id);
1090
1091 Ok(())
1092 }
1093 Message::Disconnect(disconnect) => {
1094 Err(PeerConnectionError::DisconnectReceived(disconnect.reason()))
1095 }
1096 _ => {
1097 Err(PeerConnectionError::BadRequest(
1099 "Expected Hello message".to_string(),
1100 ))
1101 }
1102 }
1103}
1104
1105pub(crate) async fn send(
1106 state: &mut Established,
1107 message: Message,
1108) -> Result<(), PeerConnectionError> {
1109 #[cfg(feature = "metrics")]
1110 {
1111 use ethrex_metrics::p2p::METRICS_P2P;
1112 METRICS_P2P.inc_outgoing_message(message.metric_label());
1113 }
1114 state.sink.send(message).await
1115}
1116
1117async fn receive<S>(stream: &mut S) -> Option<Result<Message, PeerConnectionError>>
1129where
1130 S: Unpin + Stream<Item = Result<Message, PeerConnectionError>>,
1131{
1132 stream.next().await
1133}
1134
1135fn check_serve_request_rate(state: &mut Established) -> bool {
1138 let now = Instant::now();
1139 if now.duration_since(state.serve_request_window_start) >= SERVE_REQUEST_WINDOW {
1140 state.serve_request_window_start = now;
1141 state.serve_requests_in_window = 0;
1142 }
1143 state.serve_requests_in_window += 1;
1144 state.serve_requests_in_window <= MAX_SERVE_REQUESTS_PER_WINDOW
1145}
1146
1147async fn handle_incoming_message(
1148 state: &mut Established,
1149 message: Message,
1150) -> Result<(), PeerConnectionError> {
1151 #[cfg(feature = "metrics")]
1152 {
1153 use ethrex_metrics::p2p::METRICS_P2P;
1154 METRICS_P2P.inc_incoming_message(message.metric_label());
1155 }
1156
1157 let is_data_request = matches!(
1159 message,
1160 Message::GetBlockHeaders(_)
1161 | Message::GetBlockBodies(_)
1162 | Message::GetReceipts68(_)
1163 | Message::GetReceipts69(_)
1164 | Message::GetReceipts70(_)
1165 | Message::GetPooledTransactions(_)
1166 | Message::GetAccountRange(_)
1167 | Message::GetStorageRanges(_)
1168 | Message::GetByteCodes(_)
1169 | Message::GetTrieNodes(_)
1170 );
1171 if is_data_request && !check_serve_request_rate(state) {
1172 debug!(
1173 peer = %state.node,
1174 window_requests = state.serve_requests_in_window,
1175 "Disconnecting peer: exceeded incoming request rate limit",
1176 );
1177 send_disconnect_message(state, Some(DisconnectReason::UselessPeer)).await;
1178 return Err(PeerConnectionError::DisconnectSent(
1179 DisconnectReason::UselessPeer,
1180 ));
1181 }
1182
1183 let peer_supports_eth = state.negotiated_eth_capability.is_some();
1184 #[cfg(feature = "l2")]
1185 let peer_supports_l2 = state.l2_state.connection_state().is_ok();
1186 match message {
1187 Message::Disconnect(msg_data) => {
1188 let reason = msg_data.reason();
1189 trace!(
1190 peer=%state.node,
1191 ?reason,
1192 "Received Disconnect"
1193 );
1194 state.disconnect_reason = Some(reason);
1195
1196 return Err(PeerConnectionError::DisconnectReceived(reason));
1199 }
1200 Message::Ping(_) => {
1201 trace!(peer=%state.node, "Sending pong message");
1202 send(state, Message::Pong(PongMessage {})).await?;
1203 }
1204 Message::Pong(_) => {
1205 }
1207 Message::Status68(msg_data) => {
1208 if let Some(eth) = &state.negotiated_eth_capability {
1209 backend::validate_status(msg_data, &state.storage, eth).await?
1210 };
1211 }
1212 Message::Status69(msg_data) => {
1213 if let Some(eth) = &state.negotiated_eth_capability {
1214 backend::validate_status(msg_data, &state.storage, eth).await?
1215 };
1216 }
1217 Message::Status70(msg_data) => {
1218 if let Some(eth) = &state.negotiated_eth_capability {
1219 backend::validate_status(msg_data, &state.storage, eth).await?
1220 };
1221 }
1222 Message::Status71(msg_data) => {
1223 if let Some(eth) = &state.negotiated_eth_capability {
1224 backend::validate_status(msg_data, &state.storage, eth).await?
1225 };
1226 }
1227 Message::GetAccountRange(req) => {
1228 let response = process_account_range_request(req, state.storage.clone()).await?;
1229 send(state, Message::AccountRange(response)).await?
1230 }
1231 Message::Transactions(txs) if peer_supports_eth => {
1232 if !txs.transactions.is_empty() {
1234 state.received_txs_from_peer = true;
1235 }
1236 if state.blockchain.is_synced() {
1237 let tx_hashes: Vec<_> = txs.transactions.iter().map(|tx| tx.hash()).collect();
1238
1239 let blockchain = state.blockchain.clone();
1242 let peer = state.node.to_string();
1243 #[cfg(feature = "l2")]
1244 let is_l2_mode = state.l2_state.is_supported();
1245 tokio::spawn(async move {
1246 for tx in txs.transactions {
1247 #[cfg(feature = "l2")]
1248 if (is_l2_mode && matches!(tx, Transaction::EIP4844Transaction(_)))
1249 || tx.is_privileged()
1250 {
1251 let tx_type = tx.tx_type();
1252 debug!(peer=%peer, "Rejecting transaction in L2 mode - {tx_type} transactions are not broadcasted in L2");
1253 continue;
1254 }
1255
1256 if let Err(e) = blockchain.add_transaction_to_pool(tx).await {
1257 debug!(
1258 peer=%peer,
1259 error=%e,
1260 "Error adding transaction"
1261 );
1262 }
1263 }
1264 });
1265
1266 state
1270 .tx_broadcaster
1271 .add_txs(tx_hashes, state.node.node_id())
1272 .map_err(|e| PeerConnectionError::BroadcastError(e.to_string()))?;
1273 }
1274 }
1275 Message::GetBlockHeaders(msg_data) if peer_supports_eth => {
1276 let response = BlockHeaders {
1277 id: msg_data.id,
1278 block_headers: msg_data.fetch_headers(&state.storage).await,
1279 };
1280 send(state, Message::BlockHeaders(response)).await?;
1281 }
1282 Message::GetBlockBodies(msg_data) if peer_supports_eth => {
1283 let response = BlockBodies {
1284 id: msg_data.id,
1285 block_bodies: msg_data.fetch_blocks(&state.storage).await,
1286 };
1287 send(state, Message::BlockBodies(response)).await?;
1288 }
1289 Message::GetBlockAccessLists(GetBlockAccessLists { id, block_hashes })
1290 if peer_supports_eth =>
1291 {
1292 use crate::rlpx::eth::block_access_lists::BLOCK_ACCESS_LIST_LIMIT;
1293 let mut block_access_lists =
1294 Vec::with_capacity(block_hashes.len().min(BLOCK_ACCESS_LIST_LIMIT));
1295 for hash in &block_hashes {
1296 let bal = match state.storage.get_block_access_list(*hash) {
1302 Ok(Some(bal)) => {
1303 let commitment = match state.storage.get_block_header_by_hash(*hash) {
1304 Ok(Some(header)) => header.block_access_list_hash,
1305 Ok(None) => None,
1306 Err(err) => {
1307 warn!(
1311 "Failed to read header for BAL commitment check (hash {hash:#x}): {err}; reporting BAL unavailable"
1312 );
1313 None
1314 }
1315 };
1316 bal.matches_commitment(commitment).then_some(bal)
1317 }
1318 Ok(None) => None,
1319 Err(err) => {
1320 error!("Error accessing DB while building BAL response for peer: {err}");
1321 None
1322 }
1323 };
1324 block_access_lists.push(bal);
1325 if block_access_lists.len() >= BLOCK_ACCESS_LIST_LIMIT {
1326 break;
1327 }
1328 }
1329 let response = BlockAccessLists::new(id, block_access_lists);
1330 send(state, Message::BlockAccessLists(response)).await?;
1331 }
1332 Message::GetReceipts68(GetReceipts68 { id, block_hashes }) if peer_supports_eth => {
1333 let mut receipts = Vec::new();
1334 for hash in block_hashes.iter() {
1335 receipts.push(state.storage.get_receipts_for_block(hash).await?);
1336 }
1337 send(state, Message::Receipts68(Receipts68::new(id, receipts))).await?;
1338 }
1339 Message::GetReceipts69(GetReceipts68 { id, block_hashes }) if peer_supports_eth => {
1340 let mut receipts = Vec::new();
1341 for hash in block_hashes.iter() {
1342 receipts.push(state.storage.get_receipts_for_block(hash).await?);
1343 }
1344 send(state, Message::Receipts69(Receipts69::new(id, receipts))).await?;
1345 }
1346 Message::GetReceipts70(GetReceipts70 {
1348 id,
1349 first_block_receipt_index,
1350 block_hashes,
1351 }) if peer_supports_eth => {
1352 let block_hashes = &block_hashes[..block_hashes.len().min(256)];
1353 let mut all_receipts: Vec<Vec<Receipt>> = Vec::new();
1354 let mut total_size: usize = 0;
1355 let mut last_block_incomplete = false;
1356
1357 for (i, hash) in block_hashes.iter().enumerate() {
1358 let start_index = if i == 0 { first_block_receipt_index } else { 0 };
1359 let block_receipts = state
1360 .storage
1361 .get_receipts_for_block_from_index(hash, start_index, None)
1362 .await?;
1363
1364 let mut block_receipt_list = Vec::new();
1365 let mut hit_limit = false;
1366 for receipt in block_receipts {
1367 let receipt_size = receipt.length();
1368 if total_size + receipt_size > SOFT_RESPONSE_LIMIT
1369 && (!block_receipt_list.is_empty() || !all_receipts.is_empty())
1370 {
1371 hit_limit = true;
1372 if !block_receipt_list.is_empty() {
1378 last_block_incomplete = true;
1379 }
1380 break;
1381 }
1382 total_size += receipt_size;
1383 block_receipt_list.push(receipt);
1384 }
1385
1386 if !block_receipt_list.is_empty() || !hit_limit {
1391 all_receipts.push(block_receipt_list);
1392 }
1393
1394 if hit_limit {
1395 break;
1396 }
1397 }
1398
1399 let response =
1400 Message::Receipts70(Receipts70::new(id, last_block_incomplete, all_receipts));
1401 send(state, response).await?;
1402 }
1403 Message::BlockRangeUpdate(update) => {
1404 trace!(
1405 peer=%state.node,
1406 range_from=update.earliest_block,
1407 range_to=update.latest_block,
1408 "Block range update",
1409 );
1410 if let Err(err) = update.validate() {
1412 debug!(
1413 peer=%state.node,
1414 reason=%err,
1415 "Disconnecting peer: invalid block range update",
1416 );
1417 send_disconnect_message(state, Some(DisconnectReason::SubprotocolError)).await;
1418 return Err(PeerConnectionError::DisconnectSent(
1419 DisconnectReason::SubprotocolError,
1420 ));
1421 }
1422 }
1423 Message::NewPooledTransactionHashes(new_pooled_transaction_hashes) if peer_supports_eth => {
1424 if state.blockchain.is_synced() {
1426 let hashes = new_pooled_transaction_hashes
1427 .get_transactions_to_request(&state.blockchain, state.node.node_id())?;
1428 if !hashes.is_empty() {
1429 state
1432 .pending_tx_requests
1433 .push((new_pooled_transaction_hashes, hashes));
1434 }
1435 }
1436 }
1437 Message::GetPooledTransactions(msg) => {
1438 let response = msg.handle(&state.blockchain)?;
1439 let batch_size = response.pooled_transactions.len() as u64;
1440 if state.txs_sent_to_peer + batch_size > LEECH_TX_SENT_THRESHOLD
1442 && !state.received_txs_from_peer
1443 {
1444 debug!(
1445 peer = %state.node,
1446 txs_sent = state.txs_sent_to_peer,
1447 "Disconnecting peer: leech detected (sent many txs but received none)",
1448 );
1449 send_disconnect_message(state, Some(DisconnectReason::UselessPeer)).await;
1450 return Err(PeerConnectionError::DisconnectSent(
1451 DisconnectReason::UselessPeer,
1452 ));
1453 }
1454 send(state, Message::PooledTransactions(response)).await?;
1455 state.txs_sent_to_peer += batch_size;
1456 }
1457 Message::PooledTransactions(msg) if peer_supports_eth => {
1458 if !msg.pooled_transactions.is_empty() {
1459 state.received_txs_from_peer = true;
1460 }
1461 let removed_request = state.requested_pooled_txs.remove(&msg.id);
1464 if let Some((_, ref requested_hashes, _)) = removed_request {
1465 state
1466 .blockchain
1467 .mempool
1468 .clear_in_flight_txs(requested_hashes)?;
1469 }
1470 for tx in &msg.pooled_transactions {
1472 if let P2PTransaction::EIP4844TransactionWithBlobs(itx) = tx
1473 && (itx.blobs_bundle.is_empty()
1474 || itx
1475 .blobs_bundle
1476 .validate_blob_commitment_hashes(&itx.tx.blob_versioned_hashes)
1477 .is_err())
1478 {
1479 debug!(
1480 peer=%state.node,
1481 "Disconnecting peer: invalid or missing blobs",
1482 );
1483 if let Some((_announced, requested_hashes, _)) = &removed_request {
1484 retry_on_alternates(&state.blockchain, &state.peer_table, requested_hashes)
1485 .await;
1486 }
1487 send_disconnect_message(state, Some(DisconnectReason::SubprotocolError)).await;
1488 return Err(PeerConnectionError::DisconnectSent(
1489 DisconnectReason::SubprotocolError,
1490 ));
1491 }
1492 }
1493 if state.blockchain.is_synced() {
1494 if let Some((announced, requested_hashes, _)) = &removed_request {
1495 let fork = state.blockchain.current_fork().await?;
1496 if let Err(error) = msg.validate_requested(announced, fork) {
1497 debug!(
1498 peer=%state.node,
1499 reason=%error,
1500 "Disconnecting peer: invalid pooled transactions response",
1501 );
1502 retry_on_alternates(&state.blockchain, &state.peer_table, requested_hashes)
1503 .await;
1504 send_disconnect_message(state, Some(DisconnectReason::SubprotocolError))
1505 .await;
1506 return Err(PeerConnectionError::DisconnectSent(
1507 DisconnectReason::SubprotocolError,
1508 ));
1509 }
1510 }
1511 #[cfg(feature = "l2")]
1512 let is_l2_mode = state.l2_state.is_supported();
1513
1514 #[cfg(not(feature = "l2"))]
1515 let is_l2_mode = false;
1516 if let Err(error) = msg.handle(&state.node, &state.blockchain, is_l2_mode).await {
1517 if matches!(
1518 error,
1519 ethrex_blockchain::error::MempoolError::BlobsBundleError(_)
1520 ) {
1521 debug!(
1522 peer=%state.node,
1523 reason=%error,
1524 "Disconnecting peer: invalid pooled transactions response",
1525 );
1526 if let Some((_announced, requested_hashes, _)) = &removed_request {
1527 retry_on_alternates(
1528 &state.blockchain,
1529 &state.peer_table,
1530 requested_hashes,
1531 )
1532 .await;
1533 }
1534 send_disconnect_message(state, Some(DisconnectReason::SubprotocolError))
1535 .await;
1536 return Err(PeerConnectionError::DisconnectSent(
1537 DisconnectReason::SubprotocolError,
1538 ));
1539 }
1540 return Err(error.into());
1541 }
1542 }
1543 }
1544 Message::GetStorageRanges(req) => {
1545 let response = process_storage_ranges_request(req, state.storage.clone()).await?;
1546 send(state, Message::StorageRanges(response)).await?
1547 }
1548 Message::GetByteCodes(req) => {
1549 let storage_clone = state.storage.clone();
1550 let response = process_byte_codes_request(req, storage_clone)
1551 .await
1552 .map_err(|_| {
1553 PeerConnectionError::InternalError(
1554 "Failed to execute bytecode retrieval task".to_string(),
1555 )
1556 })?;
1557 send(state, Message::ByteCodes(response)).await?
1558 }
1559 Message::GetTrieNodes(req) => {
1560 let id = req.id;
1561 match process_trie_nodes_request(req, state.storage.clone()).await {
1562 Ok(response) => send(state, Message::TrieNodes(response)).await?,
1563 Err(_) => send(state, Message::TrieNodes(TrieNodes { id, nodes: vec![] })).await?,
1564 }
1565 }
1566 #[cfg(feature = "l2")]
1567 Message::L2(req) if peer_supports_l2 => {
1568 handle_based_capability_message(state, req).await?;
1569 }
1570 message @ Message::AccountRange(_)
1572 | message @ Message::StorageRanges(_)
1573 | message @ Message::ByteCodes(_)
1574 | message @ Message::TrieNodes(_)
1575 | message @ Message::BlockBodies(_)
1576 | message @ Message::BlockHeaders(_)
1577 | message @ Message::Receipts68(_)
1578 | message @ Message::Receipts69(_)
1579 | message @ Message::Receipts70(_)
1580 | message @ Message::BlockAccessLists(_) => {
1581 if let Some((_, tx)) = message
1582 .request_id()
1583 .and_then(|id| state.current_requests.remove(&id))
1584 {
1585 tx.send(message)
1586 .map_err(|e| PeerConnectionError::SendMessage(e.to_string()))?
1587 } else {
1588 return Err(PeerConnectionError::ExpectedRequestId(format!("{message}")));
1589 }
1590 }
1591 message => return Err(PeerConnectionError::MessageNotHandled(format!("{message}"))),
1593 };
1594 Ok(())
1595}
1596
1597async fn handle_outgoing_message(
1598 state: &mut Established,
1599 message: Message,
1600) -> Result<(), PeerConnectionError> {
1601 trace!(
1602 peer=%state.node,
1603 %message,
1604 "Sending message"
1605 );
1606 send(state, message).await?;
1607 Ok(())
1608}
1609
1610async fn handle_outgoing_request(
1611 state: &mut Established,
1612 message: Message,
1613 sender: oneshot::Sender<Message>,
1614) -> Result<(), PeerConnectionError> {
1615 message.request_id().and_then(|id| {
1617 state
1618 .current_requests
1619 .insert(id, (format!("{message}"), sender))
1620 });
1621 trace!(
1622 peer=%state.node,
1623 %message,
1624 "Sending request"
1625 );
1626 send(state, message).await?;
1627 Ok(())
1628}
1629
1630async fn handle_broadcast(
1631 state: &mut Established,
1632 (id, broadcasted_msg): (task::Id, Arc<Message>),
1633) -> Result<(), PeerConnectionError> {
1634 if id != tokio::task::id() {
1635 match broadcasted_msg.as_ref() {
1636 #[cfg(feature = "l2")]
1637 l2_msg @ Message::L2(_) => {
1638 handle_l2_broadcast(state, l2_msg).await?;
1639 }
1640 msg => {
1641 error!(
1642 peer=%state.node,
1643 message=%msg,
1644 "Non-supported message broadcasted"
1645 );
1646 let error_message = format!("Non-supported message broadcasted: {msg}");
1647 return Err(PeerConnectionError::BroadcastError(error_message));
1648 }
1649 }
1650 }
1651 Ok(())
1652}
1653
1654async fn handle_block_range_update(state: &mut Established) -> Result<(), PeerConnectionError> {
1655 if should_send_block_range_update(state).await? {
1656 send_block_range_update(state).await
1657 } else {
1658 Ok(())
1659 }
1660}
1661
1662async fn flush_pending_tx_requests(state: &mut Established) -> Result<(), PeerConnectionError> {
1666 if state.pending_tx_requests.is_empty() {
1667 return Ok(());
1668 }
1669
1670 let pending = std::mem::take(&mut state.pending_tx_requests);
1671
1672 let mut all_hashes: Vec<H256> = Vec::new();
1675 let mut all_types: Vec<u8> = Vec::new();
1676 let mut all_sizes: Vec<usize> = Vec::new();
1677
1678 for (announcement, hashes) in &pending {
1679 let trimmed = announcement.filter_to(hashes);
1680 all_hashes.extend_from_slice(&trimmed.transaction_hashes);
1681 all_types.extend_from_slice(&trimmed.transaction_types);
1682 all_sizes.extend(trimmed.transaction_sizes);
1683 }
1684
1685 const MAX_HASHES_PER_REQUEST: usize = 256;
1687 for (i, chunk) in all_hashes.chunks(MAX_HASHES_PER_REQUEST).enumerate() {
1688 let offset = i * MAX_HASHES_PER_REQUEST;
1689 let chunk_types = &all_types[offset..offset + chunk.len()];
1690 let chunk_sizes = &all_sizes[offset..offset + chunk.len()];
1691
1692 let announcement = NewPooledTransactionHashes::from_raw(
1693 chunk_types.to_vec().into(),
1694 chunk_sizes.to_vec(),
1695 chunk.to_vec(),
1696 );
1697 let request = GetPooledTransactions::new(random(), chunk.to_vec());
1698 let request_id = request.id;
1699 if let Err(e) = send(state, Message::GetPooledTransactions(request)).await {
1702 let unsent = &all_hashes[offset..];
1708 if !unsent.is_empty() {
1709 if let Err(clear_err) = state.blockchain.mempool.clear_in_flight_txs(unsent) {
1710 warn!(error = %clear_err, "Failed to clear in-flight transaction tracking after send error");
1711 }
1712 retry_on_alternates(&state.blockchain, &state.peer_table, unsent).await;
1713 }
1714 return Err(e);
1715 }
1716 state
1717 .requested_pooled_txs
1718 .insert(request_id, (announcement, chunk.to_vec(), Instant::now()));
1719 }
1720
1721 Ok(())
1722}
1723
1724async fn retry_on_alternates(
1736 blockchain: &Arc<Blockchain>,
1737 peer_table: &PeerTable,
1738 hashes: &[H256],
1739) {
1740 if hashes.is_empty() {
1741 return;
1742 }
1743 type AltGroup = (PeerConnection, Vec<(H256, u8, usize)>);
1750 let mut by_peer: FxHashMap<H256, AltGroup> = FxHashMap::default();
1751 for hash in hashes {
1752 loop {
1753 let alt = match blockchain.mempool.pop_alternate(*hash) {
1754 Ok(Some(a)) => a,
1755 Ok(None) => break,
1756 Err(e) => {
1757 warn!(error = %e, "pop_alternate failed");
1758 break;
1759 }
1760 };
1761 if let Some((_, list)) = by_peer.get_mut(&alt.peer_id) {
1763 list.push((*hash, alt.tx_type, alt.tx_size));
1764 break;
1765 }
1766 match peer_table.get_peer_connection(alt.peer_id).await {
1767 Ok(Some(conn)) => {
1768 by_peer.insert(alt.peer_id, (conn, vec![(*hash, alt.tx_type, alt.tx_size)]));
1769 break;
1770 }
1771 Ok(None) => continue, Err(e) => {
1773 warn!(error = %e, "get_peer_connection failed");
1774 break;
1775 }
1776 }
1777 }
1778 }
1779
1780 for (_, (conn, entries)) in by_peer {
1781 let mut types = Vec::with_capacity(entries.len());
1782 let mut sizes = Vec::with_capacity(entries.len());
1783 let mut hash_list = Vec::with_capacity(entries.len());
1784 for (h, t, s) in &entries {
1785 hash_list.push(*h);
1786 types.push(*t);
1787 sizes.push(*s);
1788 }
1789 let announcement =
1790 NewPooledTransactionHashes::from_raw(types.into(), sizes, hash_list.clone());
1791 if let Err(e) = conn.enqueue_tx_requests(announcement, hash_list) {
1792 debug!(error = %e, "Failed to enqueue tx requests on alternate peer");
1793 }
1794 }
1795}