Skip to main content

ethrex_p2p/rlpx/connection/
server.rs

1#[cfg(feature = "l2")]
2use crate::rlpx::l2::{
3    PERIODIC_BATCH_BROADCAST_INTERVAL, PERIODIC_BLOCK_BROADCAST_INTERVAL,
4    l2_connection::{
5        self, L2Cast, L2ConnState, handle_based_capability_message, handle_l2_broadcast,
6    },
7};
8use crate::{
9    backend,
10    metrics::METRICS,
11    network::P2PContext,
12    peer_table::{PeerTable, PeerTableServerProtocol as _},
13    rlpx::{
14        Message,
15        connection::{codec::RLPxCodec, handshake},
16        error::PeerConnectionError,
17        eth::{
18            block_access_lists::{BlockAccessLists, GetBlockAccessLists},
19            blocks::{BlockBodies, BlockHeaders},
20            receipts::{
21                GetReceipts68, GetReceipts70, Receipts68, Receipts69, Receipts70,
22                SOFT_RESPONSE_LIMIT,
23            },
24            status::{StatusMessage68, StatusMessage69, StatusMessage70, StatusMessage71},
25            transactions::{GetPooledTransactions, NewPooledTransactionHashes},
26            update::BlockRangeUpdate,
27        },
28        message::EthCapVersion,
29        p2p::{
30            self, Capability, DisconnectMessage, DisconnectReason, PingMessage, PongMessage,
31            SUPPORTED_ETH_CAPABILITIES, SUPPORTED_SNAP_CAPABILITIES,
32        },
33        snap::TrieNodes,
34    },
35    snap::{
36        process_account_range_request, process_byte_codes_request, process_storage_ranges_request,
37        process_trie_nodes_request,
38    },
39    tx_broadcaster::{TxBroadcaster, TxBroadcasterProtocol as _, send_tx_hashes},
40    types::Node,
41};
42use ethrex_blockchain::Blockchain;
43use ethrex_common::H256;
44#[cfg(feature = "l2")]
45use ethrex_common::types::Transaction;
46use ethrex_common::types::{MempoolTransaction, P2PTransaction, Receipt};
47use ethrex_rlp::encode::RLPEncode;
48use ethrex_storage::{Store, error::StoreError};
49use ethrex_trie::TrieError;
50use futures::{SinkExt as _, Stream, stream::SplitSink};
51use rand::random;
52use rustc_hash::FxHashMap;
53use secp256k1::{PublicKey, SecretKey};
54use spawned_concurrency::{
55    actor,
56    error::ActorError,
57    protocol,
58    tasks::{Actor, ActorRef, ActorStart as _, Context, Handler, send_interval, spawn_listener},
59};
60use spawned_rt::tasks::BroadcastStream;
61use std::{
62    collections::HashMap,
63    net::SocketAddr,
64    sync::{Arc, RwLock},
65    time::{Duration, Instant},
66};
67use tokio::{
68    net::TcpStream,
69    sync::{broadcast, oneshot},
70    task::{self, Id},
71};
72use tokio_stream::StreamExt;
73use tokio_util::codec::Framed;
74use tracing::{debug, error, trace, warn};
75
76const PING_INTERVAL: Duration = Duration::from_secs(10);
77const BLOCK_RANGE_UPDATE_INTERVAL: Duration = Duration::from_secs(60);
78const INFLIGHT_TX_SWEEP_INTERVAL: Duration = Duration::from_secs(15);
79const INFLIGHT_TX_TIMEOUT: Duration = Duration::from_secs(30);
80/// How often to flush buffered transaction hash requests into a single
81/// batched GetPooledTransactions message.
82const TX_REQUEST_BATCH_INTERVAL: Duration = Duration::from_millis(50);
83/// Fixed (tumbling) time window for incoming request rate limiting.
84const SERVE_REQUEST_WINDOW: Duration = Duration::from_secs(60);
85/// Maximum number of data-serving requests allowed per peer within the rate-limit window.
86const MAX_SERVE_REQUESTS_PER_WINDOW: u64 = 500;
87/// Number of transactions sent to a peer before checking for leeching behaviour.
88const LEECH_TX_SENT_THRESHOLD: u64 = 10_000;
89
90pub(crate) type PeerConnBroadcastSender = broadcast::Sender<(tokio::task::Id, Arc<Message>)>;
91
92#[protocol]
93pub trait PeerConnectionServerProtocol: Send + Sync {
94    fn incoming_message(&self, message: Message) -> Result<(), ActorError>;
95    fn outgoing_message(&self, message: Message) -> Result<(), ActorError>;
96    fn outgoing_request(
97        &self,
98        message: Message,
99        sender: Arc<oneshot::Sender<Message>>,
100    ) -> Result<(), ActorError>;
101    fn request_timeout(&self, id: u64) -> Result<(), ActorError>;
102    fn send_ping(&self) -> Result<(), ActorError>;
103    fn block_range_update(&self) -> Result<(), ActorError>;
104    fn broadcast_message(&self, task_id: Id, msg: Arc<Message>) -> Result<(), ActorError>;
105    fn sweep_inflight_txs(&self) -> Result<(), ActorError>;
106    fn flush_pending_tx_requests(&self) -> Result<(), ActorError>;
107    fn enqueue_tx_requests(
108        &self,
109        announcement: NewPooledTransactionHashes,
110        hashes: Vec<H256>,
111    ) -> Result<(), ActorError>;
112}
113
114#[cfg(feature = "l2")]
115#[derive(Clone)]
116pub struct L2Message {
117    pub msg: L2Cast,
118}
119
120#[cfg(feature = "l2")]
121impl spawned_concurrency::message::Message for L2Message {
122    type Result = ();
123}
124
125#[derive(Clone, Debug)]
126pub struct PeerConnection {
127    handle: ActorRef<PeerConnectionServer>,
128}
129
130impl PeerConnection {
131    pub fn spawn_as_receiver(
132        context: P2PContext,
133        peer_addr: SocketAddr,
134        stream: TcpStream,
135    ) -> PeerConnection {
136        let state = ConnectionState::Receiver(Receiver {
137            context,
138            peer_addr,
139            stream: Arc::new(stream),
140        });
141        let connection = PeerConnectionServer { state };
142        Self {
143            handle: connection.start(),
144        }
145    }
146
147    pub fn spawn_as_initiator(context: P2PContext, node: &Node) -> PeerConnection {
148        let state = ConnectionState::Initiator(Initiator {
149            context,
150            node: node.clone(),
151        });
152        let connection = PeerConnectionServer { state };
153        Self {
154            handle: connection.start(),
155        }
156    }
157
158    pub async fn outgoing_message(&mut self, message: Message) -> Result<(), PeerConnectionError> {
159        self.handle
160            .outgoing_message(message)
161            .map_err(|err| PeerConnectionError::InternalError(err.to_string()))
162    }
163
164    /// Queue tx hashes (with the originating announcement metadata) to be
165    /// requested on the next flush tick. Used as a fallback when an in-flight
166    /// request on another peer fails.
167    pub fn enqueue_tx_requests(
168        &self,
169        announcement: NewPooledTransactionHashes,
170        hashes: Vec<H256>,
171    ) -> Result<(), PeerConnectionError> {
172        self.handle
173            .enqueue_tx_requests(announcement, hashes)
174            .map_err(|err| PeerConnectionError::InternalError(err.to_string()))
175    }
176
177    pub async fn outgoing_request(
178        &mut self,
179        message: Message,
180        timeout: Duration,
181    ) -> Result<Message, PeerConnectionError> {
182        let id = message
183            .request_id()
184            .expect("Cannot wait on request without id");
185        let (oneshot_tx, oneshot_rx) = oneshot::channel::<Message>();
186
187        self.handle
188            .outgoing_request(message, Arc::new(oneshot_tx))
189            .map_err(|err| PeerConnectionError::InternalError(err.to_string()))?;
190
191        // Wait for the response or timeout. This blocks the calling task (and not the ConnectionServer task)
192        match tokio::time::timeout(timeout, oneshot_rx).await {
193            Ok(Ok(response)) => Ok(response),
194            Ok(Err(error)) => Err(PeerConnectionError::RecvError(error.to_string())),
195            Err(_timeout) => {
196                // Notify timeout on request id
197                self.handle
198                    .request_timeout(id)
199                    .map_err(|err| PeerConnectionError::InternalError(err.to_string()))?;
200                // Return timeout error
201                Err(PeerConnectionError::Timeout)
202            }
203        }
204    }
205}
206
207#[derive(Debug)]
208pub struct Initiator {
209    pub(crate) context: P2PContext,
210    pub(crate) node: Node,
211}
212
213#[derive(Debug)]
214pub struct Receiver {
215    pub(crate) context: P2PContext,
216    pub(crate) peer_addr: SocketAddr,
217    pub(crate) stream: Arc<TcpStream>,
218}
219
220#[derive(Debug)]
221pub struct Established {
222    pub(crate) signer: SecretKey,
223    // Sending part of the TcpStream to connect with the remote peer
224    // The receiving part is owned by the stream listen loop task
225    pub(crate) sink: SplitSink<Framed<TcpStream, RLPxCodec>, Message>,
226    pub(crate) node: Node,
227    pub(crate) storage: Store,
228    pub(crate) blockchain: Arc<Blockchain>,
229    pub(crate) capabilities: Vec<Capability>,
230    pub(crate) negotiated_eth_capability: Option<Capability>,
231    pub(crate) negotiated_snap_capability: Option<Capability>,
232    pub(crate) last_block_range_update_block: u64,
233    /// Maps request ID to (original announcement, actually requested hashes, request time).
234    /// The announcement is kept for response validation; the hashes track in-flight state.
235    pub(crate) requested_pooled_txs: HashMap<u64, (NewPooledTransactionHashes, Vec<H256>, Instant)>,
236    /// Buffered transaction requests waiting to be flushed as a single batch.
237    /// Accumulated between flush ticks (TX_REQUEST_BATCH_INTERVAL).
238    pub(crate) pending_tx_requests: Vec<(NewPooledTransactionHashes, Vec<H256>)>,
239    pub(crate) client_version: String,
240    //// Send end of the channel used to broadcast messages
241    //// to other connected peers, is ok to have it here,
242    //// since internally it's an Arc.
243    //// The ID is to ignore the message sent from the same task.
244    //// This is used both to send messages and to received broadcasted
245    //// messages from other connections (sent from other peers).
246    //// The receive end is instantiated after the handshake is completed
247    //// under `handle_peer`.
248    /// TODO: Improve this mechanism
249    /// See https://github.com/lambdaclass/ethrex/issues/3388
250    pub(crate) connection_broadcast_send: PeerConnBroadcastSender,
251    pub(crate) peer_table: PeerTable,
252    #[cfg(feature = "l2")]
253    pub(crate) l2_state: L2ConnState,
254    pub(crate) tx_broadcaster: ActorRef<TxBroadcaster>,
255    pub(crate) current_requests: HashMap<u64, (String, oneshot::Sender<Message>)>,
256    // We store the disconnection reason to handle it in the teardown
257    pub(crate) disconnect_reason: Option<DisconnectReason>,
258    // Indicates if the peer has been validated (ie. the connection was established successfully)
259    pub(crate) is_validated: bool,
260    // Rate limiting: start of the current incoming-request window
261    pub(crate) serve_request_window_start: Instant,
262    // Rate limiting: number of data-serving requests received in the current window
263    pub(crate) serve_requests_in_window: u64,
264    // Leech detection: total transactions sent to this peer via GetPooledTransactions responses
265    pub(crate) txs_sent_to_peer: u64,
266    // Leech detection: whether we have received any transactions from this peer
267    pub(crate) received_txs_from_peer: bool,
268}
269
270impl Established {
271    async fn teardown(&mut self) {
272        // Clear any in-flight transaction hashes so other connections can re-request them,
273        // then try to re-issue each pending request to an alternate announcer.
274        // Order matters: clear first so the alternate's reserve_unknown_hashes sees the
275        // hashes as free; otherwise the actor handler can race with clear_in_flight_txs
276        // and silently no-op the retry while consuming an alternates slot.
277        for (_, (_announced, requested_hashes, _)) in self.requested_pooled_txs.drain() {
278            if let Err(e) = self
279                .blockchain
280                .mempool
281                .clear_in_flight_txs(&requested_hashes)
282            {
283                warn!(error = %e, "Failed to clear in-flight transaction tracking during peer teardown");
284            }
285            retry_on_alternates(&self.blockchain, &self.peer_table, &requested_hashes).await;
286        }
287        // Also clear hashes that were buffered but not yet sent.
288        for (_announced, pending_hashes) in self.pending_tx_requests.drain(..) {
289            if let Err(e) = self.blockchain.mempool.clear_in_flight_txs(&pending_hashes) {
290                warn!(error = %e, "Failed to clear in-flight transaction tracking during peer teardown");
291            }
292            retry_on_alternates(&self.blockchain, &self.peer_table, &pending_hashes).await;
293        }
294        // Closing the sink. It may fail if it is already closed (eg. the other side already closed it)
295        // Just logging a debug line if that's the case.
296        let _ = self
297            .sink
298            .close()
299            .await
300            .inspect_err(|err| debug!("Could not close the socket: {err}"));
301    }
302}
303
304#[derive(Debug)]
305pub enum ConnectionState {
306    HandshakeFailed,
307    Initiator(Initiator),
308    Receiver(Receiver),
309    Established(Box<Established>),
310}
311
312#[derive(Debug)]
313pub struct PeerConnectionServer {
314    state: ConnectionState,
315}
316
317#[actor(protocol = PeerConnectionServerProtocol)]
318impl PeerConnectionServer {
319    #[started]
320    async fn started(&mut self, ctx: &Context<Self>) {
321        // Set a default eth version that we can update after we negotiate peer capabilities
322        // This eth version will only be used to encode & decode the initial `Hello` messages.
323        let eth_version = Arc::new(RwLock::new(EthCapVersion::default()));
324        // Take ownership of the state, replacing with HandshakeFailed as placeholder
325        let state = std::mem::replace(&mut self.state, ConnectionState::HandshakeFailed);
326        match handshake::perform(state, eth_version.clone()).await {
327            Ok((mut established_state, stream)) => {
328                trace!(peer=%established_state.node, "Starting RLPx connection");
329                if let Err(reason) =
330                    initialize_connection(ctx, &mut established_state, stream, eth_version).await
331                {
332                    match &reason {
333                        PeerConnectionError::NoMatchingCapabilities
334                        | PeerConnectionError::HandshakeError(_) => {
335                            if let Err(e) = established_state
336                                .peer_table
337                                .set_unwanted(established_state.node.node_id())
338                            {
339                                debug!("Failed to set peer as unwanted: {e}");
340                            }
341                        }
342                        _ => {}
343                    }
344                    connection_failed(
345                        &mut established_state,
346                        "Failed to initialize RLPx connection",
347                        &reason,
348                    )
349                    .await;
350
351                    METRICS.record_new_rlpx_conn_failure(reason).await;
352
353                    self.state = ConnectionState::Established(Box::new(established_state));
354                    ctx.stop();
355                } else {
356                    METRICS
357                        .record_new_rlpx_conn_established(
358                            &established_state
359                                .node
360                                .version
361                                .clone()
362                                .unwrap_or("Unknown".to_string()),
363                        )
364                        .await;
365                    established_state.is_validated = true;
366                    // New state
367                    self.state = ConnectionState::Established(Box::new(established_state));
368                }
369            }
370            Err(err) => {
371                // Handshake failed, just log a debug message.
372                // No connection was established so no need to perform any other action
373                debug!("Failed Handshake on RLPx connection {err}");
374                self.state = ConnectionState::HandshakeFailed;
375                ctx.stop();
376            }
377        }
378    }
379
380    #[stopped]
381    async fn stopped(&mut self, _ctx: &Context<Self>) {
382        match std::mem::replace(&mut self.state, ConnectionState::HandshakeFailed) {
383            ConnectionState::Established(mut established_state) => {
384                trace!(peer=%established_state.node, "Closing connection with established peer");
385                if established_state.is_validated {
386                    // If its validated the peer was connected, so we record the disconnection.
387                    let reason = established_state
388                        .disconnect_reason
389                        .unwrap_or(DisconnectReason::NetworkError);
390                    METRICS
391                        .record_new_rlpx_conn_disconnection(
392                            &established_state
393                                .node
394                                .version
395                                .clone()
396                                .unwrap_or("Unknown".to_string()),
397                            reason,
398                        )
399                        .await;
400                }
401                if let Err(e) = established_state
402                    .peer_table
403                    .remove_peer(established_state.node.node_id())
404                {
405                    debug!("Failed to remove peer from table: {e}");
406                }
407                established_state.teardown().await;
408            }
409            _ => {
410                // Nothing to do if the connection was not established
411            }
412        };
413    }
414
415    #[send_handler]
416    async fn handle_incoming_message(
417        &mut self,
418        msg: peer_connection_server_protocol::IncomingMessage,
419        ctx: &Context<Self>,
420    ) {
421        if let ConnectionState::Established(ref mut established_state) = self.state {
422            trace!(
423                peer=%established_state.node,
424                message=%msg.message,
425                "Received incoming message",
426            );
427            let result = handle_incoming_message(established_state, msg.message).await;
428            Self::process_cast_error(&self.state, result, ctx);
429        } else {
430            debug!("Connection not yet established");
431        }
432    }
433
434    #[send_handler]
435    async fn handle_outgoing_message(
436        &mut self,
437        msg: peer_connection_server_protocol::OutgoingMessage,
438        ctx: &Context<Self>,
439    ) {
440        if let ConnectionState::Established(ref mut established_state) = self.state {
441            trace!(
442                peer=%established_state.node,
443                message=%msg.message,
444                "Received outgoing request",
445            );
446            let result = handle_outgoing_message(established_state, msg.message).await;
447            Self::process_cast_error(&self.state, result, ctx);
448        } else {
449            debug!("Connection not yet established");
450        }
451    }
452
453    #[send_handler]
454    async fn handle_outgoing_request(
455        &mut self,
456        msg: peer_connection_server_protocol::OutgoingRequest,
457        ctx: &Context<Self>,
458    ) {
459        if let ConnectionState::Established(ref mut established_state) = self.state {
460            trace!(
461                peer=%established_state.node,
462                message=%msg.message,
463                "Received outgoing request",
464            );
465            let Some(sender) = Arc::<oneshot::Sender<Message>>::into_inner(msg.sender) else {
466                debug!("Could not obtain sender channel: Arc has multiple references");
467                return;
468            };
469            let result = handle_outgoing_request(established_state, msg.message, sender).await;
470            Self::process_cast_error(&self.state, result, ctx);
471        } else {
472            debug!("Connection not yet established");
473        }
474    }
475
476    #[send_handler]
477    async fn handle_request_timeout(
478        &mut self,
479        msg: peer_connection_server_protocol::RequestTimeout,
480        _ctx: &Context<Self>,
481    ) {
482        if let ConnectionState::Established(ref mut established_state) = self.state {
483            // Discard the request from current requests
484            if let Some((msg_type, _)) = established_state.current_requests.remove(&msg.id) {
485                debug!(
486                    peer=%established_state.node,
487                    %msg_type,
488                    id=%msg.id,
489                    "Request timedout",
490                );
491            }
492        } else {
493            debug!("Connection not yet established");
494        }
495    }
496
497    #[send_handler]
498    async fn handle_send_ping(
499        &mut self,
500        _msg: peer_connection_server_protocol::SendPing,
501        ctx: &Context<Self>,
502    ) {
503        if let ConnectionState::Established(ref mut established_state) = self.state {
504            let result = send(established_state, Message::Ping(PingMessage {})).await;
505            Self::process_cast_error(&self.state, result, ctx);
506        } else {
507            debug!("Connection not yet established");
508        }
509    }
510
511    #[send_handler]
512    async fn handle_block_range_update(
513        &mut self,
514        _msg: peer_connection_server_protocol::BlockRangeUpdate,
515        ctx: &Context<Self>,
516    ) {
517        if let ConnectionState::Established(ref mut established_state) = self.state {
518            trace!(
519                peer=%established_state.node,
520                "Block Range Update"
521            );
522            let result = handle_block_range_update(established_state).await;
523            Self::process_cast_error(&self.state, result, ctx);
524        } else {
525            debug!("Connection not yet established");
526        }
527    }
528
529    #[send_handler]
530    async fn handle_sweep_inflight_txs(
531        &mut self,
532        _msg: peer_connection_server_protocol::SweepInflightTxs,
533        _ctx: &Context<Self>,
534    ) {
535        if let ConnectionState::Established(ref mut state) = self.state {
536            let now = Instant::now();
537            let stale_ids: Vec<u64> = state
538                .requested_pooled_txs
539                .iter()
540                .filter(|(_, (_, _, ts))| now.duration_since(*ts) > INFLIGHT_TX_TIMEOUT)
541                .map(|(id, _)| *id)
542                .collect();
543            for id in stale_ids {
544                if let Some((_announced, hashes, _)) = state.requested_pooled_txs.remove(&id) {
545                    // Clear in-flight before retry so the alternate's reserve_unknown_hashes
546                    // doesn't race against still-in-flight state and silently no-op.
547                    if let Err(e) = state.blockchain.mempool.clear_in_flight_txs(&hashes) {
548                        warn!(error = %e, "Failed to clear in-flight transaction tracking while sweeping stale requests");
549                    }
550                    retry_on_alternates(&state.blockchain, &state.peer_table, &hashes).await;
551                }
552            }
553        }
554    }
555
556    #[send_handler]
557    async fn handle_flush_pending_tx_requests(
558        &mut self,
559        _msg: peer_connection_server_protocol::FlushPendingTxRequests,
560        ctx: &Context<Self>,
561    ) {
562        if let ConnectionState::Established(ref mut established_state) = self.state {
563            let result = flush_pending_tx_requests(established_state).await;
564            Self::process_cast_error(&self.state, result, ctx);
565        }
566    }
567
568    #[send_handler]
569    async fn handle_enqueue_tx_requests(
570        &mut self,
571        msg: peer_connection_server_protocol::EnqueueTxRequests,
572        _ctx: &Context<Self>,
573    ) {
574        if let ConnectionState::Established(ref mut state) = self.state {
575            // Re-reserve in-flight against this peer. If any hashes are already
576            // in-flight (race), drop them; we don't want duplicate requests.
577            let to_request: Vec<H256> = match state.blockchain.mempool.reserve_unknown_hashes(
578                &msg.announcement.transaction_hashes,
579                &msg.announcement.transaction_types,
580                &msg.announcement.transaction_sizes,
581                state.node.node_id(),
582            ) {
583                Ok(unknown) => unknown,
584                Err(_) => return,
585            };
586            if to_request.is_empty() {
587                return;
588            }
589            let trimmed = msg.announcement.filter_to(&to_request);
590            state.pending_tx_requests.push((trimmed, to_request));
591        }
592    }
593
594    #[send_handler]
595    async fn handle_broadcast_message(
596        &mut self,
597        msg: peer_connection_server_protocol::BroadcastMessage,
598        ctx: &Context<Self>,
599    ) {
600        if let ConnectionState::Established(ref mut established_state) = self.state {
601            trace!(
602                peer=%established_state.node,
603                message=%msg.msg,
604                "Received broadcasted message",
605            );
606            let result = handle_broadcast(established_state, (msg.task_id, msg.msg)).await;
607            Self::process_cast_error(&self.state, result, ctx);
608        } else {
609            debug!("Connection not yet established");
610        }
611    }
612
613    #[cfg(feature = "l2")]
614    #[send_handler]
615    async fn handle_l2_message(&mut self, msg: L2Message, ctx: &Context<Self>) {
616        if let ConnectionState::Established(ref mut established_state) = self.state {
617            let peer_supports_l2 = established_state.l2_state.connection_state().is_ok();
618            let result = if peer_supports_l2 {
619                trace!(
620                    peer=%established_state.node,
621                    message=?msg.msg,
622                    "Handling cast for L2 msg"
623                );
624                match msg.msg {
625                    L2Cast::BatchBroadcast => {
626                        let res = l2_connection::send_sealed_batch(established_state).await;
627                        res.and(l2_connection::process_batches_on_queue(established_state).await)
628                    }
629                    L2Cast::BlockBroadcast => {
630                        let res = l2_connection::send_new_block(established_state).await;
631                        res.and(l2_connection::process_blocks_on_queue(established_state).await)
632                    }
633                }
634            } else {
635                Err(PeerConnectionError::MessageNotHandled(
636                    "Unknown message or capability not handled".to_string(),
637                ))
638            };
639            Self::process_cast_error(&self.state, result, ctx);
640        } else {
641            debug!("Connection not yet established");
642        }
643    }
644
645    fn process_cast_error(
646        state: &ConnectionState,
647        result: Result<(), PeerConnectionError>,
648        ctx: &Context<Self>,
649    ) {
650        if let Err(e) = result
651            && let ConnectionState::Established(established_state) = state
652        {
653            match e {
654                PeerConnectionError::Disconnected
655                | PeerConnectionError::DisconnectReceived(_)
656                | PeerConnectionError::DisconnectSent(_)
657                | PeerConnectionError::HandshakeError(_)
658                | PeerConnectionError::NoMatchingCapabilities
659                | PeerConnectionError::InvalidPeerId
660                | PeerConnectionError::InvalidMessageLength
661                | PeerConnectionError::StateError(_)
662                | PeerConnectionError::InvalidRecoveryId => {
663                    trace!(peer=%established_state.node, error=e.to_string(), "Peer connection error");
664                    ctx.stop();
665                }
666                PeerConnectionError::IoError(ref io_e)
667                    if io_e.kind() == std::io::ErrorKind::BrokenPipe =>
668                {
669                    // TODO: we need to check if this message is ocurring commonly due to a problem
670                    // with our concurrency model
671                    debug!(peer=%established_state.node, "Broken pipe with peer, disconnected");
672                    ctx.stop();
673                }
674                PeerConnectionError::StoreError(StoreError::Trie(TrieError::InconsistentTree(
675                    _,
676                ))) => {
677                    if established_state.blockchain.is_synced() {
678                        // If we're responding with inconsistent trie while synced, our trie may be broken
679                        // If this error is non sporadic we should investigate
680                        error!(
681                            peer=%established_state.node,
682                            error=%e,
683                            "Inconsistent trie while serving peer request; local state may be corrupted",
684                        );
685                    } else {
686                        // If we're not synced, we expect to have inconsistent trie errors
687                        trace!(
688                            peer=%established_state.node,
689                            error=%e,
690                            "Error handling cast message",
691                        );
692                    }
693                }
694                _ => {
695                    // We should check why we're failling to handle the cast message
696                    debug!(
697                        peer=%established_state.node,
698                        capabilities=?established_state.capabilities,
699                        error=%e,
700                        "Error handling cast message",
701                    );
702                }
703            }
704        }
705    }
706}
707
708async fn initialize_connection<S>(
709    ctx: &Context<PeerConnectionServer>,
710    state: &mut Established,
711    mut stream: S,
712    eth_version: Arc<RwLock<EthCapVersion>>,
713) -> Result<(), PeerConnectionError>
714where
715    S: Unpin + Send + Stream<Item = Result<Message, PeerConnectionError>> + 'static,
716{
717    if state.peer_table.target_peers_reached().await? {
718        debug!(peer=%state.node, "Reached target peer connections, discarding.");
719        return Err(PeerConnectionError::TooManyPeers);
720    }
721    exchange_hello_messages(state, &mut stream).await?;
722
723    // Update eth capability version to the negotiated version for further message decoding
724    let version = match &state.negotiated_eth_capability {
725        Some(cap) if cap == &Capability::eth(68) => EthCapVersion::V68,
726        Some(cap) if cap == &Capability::eth(69) => EthCapVersion::V69,
727        Some(cap) if cap == &Capability::eth(70) => EthCapVersion::V70,
728        Some(cap) if cap == &Capability::eth(71) => EthCapVersion::V71,
729        _ => EthCapVersion::default(),
730    };
731    *eth_version
732        .write()
733        .map_err(|err| PeerConnectionError::InternalError(err.to_string()))? = version;
734
735    init_capabilities(state, &mut stream).await?;
736
737    let mut connection = PeerConnection {
738        handle: ctx.actor_ref(),
739    };
740
741    state.peer_table.new_connected_peer(
742        state.node.clone(),
743        connection.clone(),
744        state.capabilities.clone(),
745    )?;
746
747    trace!(peer=%state.node, "Peer connection initialized.");
748
749    // Send transactions transaction hashes from mempool at connection start
750    send_all_pooled_tx_hashes(state, &mut connection).await?;
751
752    // Periodic Pings repeated events.
753    send_interval(
754        PING_INTERVAL,
755        ctx.clone(),
756        peer_connection_server_protocol::SendPing,
757    );
758
759    // Periodic block range update.
760    send_interval(
761        BLOCK_RANGE_UPDATE_INTERVAL,
762        ctx.clone(),
763        peer_connection_server_protocol::BlockRangeUpdate,
764    );
765
766    // Periodic sweep of stale in-flight transaction requests.
767    send_interval(
768        INFLIGHT_TX_SWEEP_INTERVAL,
769        ctx.clone(),
770        peer_connection_server_protocol::SweepInflightTxs,
771    );
772
773    // Periodic flush of buffered transaction requests.
774    send_interval(
775        TX_REQUEST_BATCH_INTERVAL,
776        ctx.clone(),
777        peer_connection_server_protocol::FlushPendingTxRequests,
778    );
779
780    #[cfg(feature = "l2")]
781    // Periodic L2 messages events.
782    if state.l2_state.connection_state().is_ok() {
783        send_interval(
784            PERIODIC_BLOCK_BROADCAST_INTERVAL,
785            ctx.clone(),
786            L2Message {
787                msg: L2Cast::BlockBroadcast,
788            },
789        );
790        send_interval(
791            PERIODIC_BATCH_BROADCAST_INTERVAL,
792            ctx.clone(),
793            L2Message {
794                msg: L2Cast::BatchBroadcast,
795            },
796        );
797    }
798
799    spawn_listener(
800        ctx.clone(),
801        stream.filter_map(|result| match result {
802            Ok(msg) => Some(peer_connection_server_protocol::IncomingMessage { message: msg }),
803            Err(e) => {
804                debug!(error=?e, "Error receiving RLPx message");
805                // Skipping invalid data
806                None
807            }
808        }),
809    );
810
811    if state.negotiated_eth_capability.is_some() {
812        let stream: BroadcastStream<(Id, Arc<Message>)> =
813            BroadcastStream::new(state.connection_broadcast_send.subscribe());
814        let message_stream = stream.filter_map(|result| {
815            result.ok().map(
816                |(id, msg)| peer_connection_server_protocol::BroadcastMessage { task_id: id, msg },
817            )
818        });
819        spawn_listener(ctx.clone(), message_stream);
820    }
821
822    Ok(())
823}
824
825async fn send_all_pooled_tx_hashes(
826    state: &mut Established,
827    connection: &mut PeerConnection,
828) -> Result<(), PeerConnectionError> {
829    let txs: Vec<MempoolTransaction> = state
830        .blockchain
831        .mempool
832        .get_all_txs_by_sender()?
833        .into_values()
834        .flatten()
835        .filter(|tx| !tx.is_privileged())
836        .collect();
837    if !txs.is_empty() {
838        state
839            .tx_broadcaster
840            .add_txs(
841                txs.iter().map(|tx| tx.hash()).collect(),
842                state.node.node_id(),
843            )
844            .map_err(|e| PeerConnectionError::BroadcastError(e.to_string()))?;
845        send_tx_hashes(
846            txs,
847            state.capabilities.clone(),
848            connection,
849            state.node.node_id(),
850            &state.blockchain,
851        )
852        .await
853        .map_err(|e| PeerConnectionError::SendMessage(e.to_string()))?;
854    }
855    Ok(())
856}
857
858async fn send_block_range_update(state: &mut Established) -> Result<(), PeerConnectionError> {
859    // BlockRangeUpdate was introduced in eth/69
860    if state
861        .negotiated_eth_capability
862        .as_ref()
863        .is_some_and(|eth| eth.version >= 69)
864    {
865        trace!(peer=%state.node, "Sending BlockRangeUpdate");
866        let update = BlockRangeUpdate::new(&state.storage).await?;
867        let lastet_block = update.latest_block;
868        send(state, Message::BlockRangeUpdate(update)).await?;
869        state.last_block_range_update_block = lastet_block - (lastet_block % 32);
870    }
871    Ok(())
872}
873
874async fn should_send_block_range_update(state: &Established) -> Result<bool, PeerConnectionError> {
875    let latest_block = state.storage.get_latest_block_number().await?;
876    if latest_block < state.last_block_range_update_block
877        || latest_block - state.last_block_range_update_block >= 32
878    {
879        return Ok(true);
880    }
881    Ok(false)
882}
883
884async fn init_capabilities<S>(
885    state: &mut Established,
886    stream: &mut S,
887) -> Result<(), PeerConnectionError>
888where
889    S: Unpin + Stream<Item = Result<Message, PeerConnectionError>>,
890{
891    // Sending eth Status if peer supports it
892    if let Some(eth) = state.negotiated_eth_capability.clone() {
893        let status = match eth.version {
894            68 => Message::Status68(StatusMessage68::new(&state.storage).await?),
895            69 => Message::Status69(StatusMessage69::new(&state.storage).await?),
896            70 => Message::Status70(StatusMessage70::new(&state.storage).await?),
897            71 => Message::Status71(StatusMessage71::new(&state.storage).await?),
898            ver => {
899                return Err(PeerConnectionError::HandshakeError(format!(
900                    "Invalid eth version {ver}"
901                )));
902            }
903        };
904        trace!(peer=%state.node, "Sending status");
905        send(state, status).await?;
906        // The next immediate message in the ETH protocol is the
907        // status, reference here:
908        // https://github.com/ethereum/devp2p/blob/master/caps/eth.md#status-0x00
909        let msg = match receive(stream).await {
910            Some(msg) => msg?,
911            None => return Err(PeerConnectionError::Disconnected),
912        };
913        match msg {
914            Message::Status68(msg_data) => {
915                trace!(peer=%state.node, "Received Status(68)");
916                backend::validate_status(msg_data, &state.storage, &eth).await?
917            }
918            Message::Status69(msg_data) => {
919                trace!(peer=%state.node, "Received Status(69)");
920                backend::validate_status(msg_data, &state.storage, &eth).await?
921            }
922            Message::Status70(msg_data) => {
923                trace!(peer=%state.node, "Received Status(70)");
924                backend::validate_status(msg_data, &state.storage, &eth).await?
925            }
926            Message::Status71(msg_data) => {
927                trace!(peer=%state.node, "Received Status(71)");
928                backend::validate_status(msg_data, &state.storage, &eth).await?
929            }
930            Message::Disconnect(disconnect) => {
931                return Err(PeerConnectionError::HandshakeError(format!(
932                    "Peer disconnected due to: {}",
933                    disconnect.reason()
934                )));
935            }
936            _ => {
937                return Err(PeerConnectionError::HandshakeError(
938                    "Expected a Status message".to_string(),
939                ));
940            }
941        }
942    }
943    Ok(())
944}
945
946async fn send_disconnect_message(state: &mut Established, reason: Option<DisconnectReason>) {
947    send(state, Message::Disconnect(DisconnectMessage { reason }))
948        .await
949        .unwrap_or_else(|_| {
950            debug!(
951                peer=%state.node,
952                ?reason,
953                "Could not send Disconnect message",
954            );
955        });
956}
957
958async fn connection_failed(state: &mut Established, error_text: &str, error: &PeerConnectionError) {
959    debug!(
960        peer=%state.node,
961        %error_text,
962        %error,
963        "connection failure"
964    );
965
966    // Send disconnect message only if error is different than RLPxError::DisconnectRequested
967    // because if it is a DisconnectRequested error it means that the peer requested the disconnection, not us.
968    if !matches!(error, PeerConnectionError::DisconnectReceived(_)) {
969        send_disconnect_message(state, match_disconnect_reason(error)).await;
970    }
971
972    // Discard peer from kademlia table in some cases
973    match error {
974        // already connected, don't discard it
975        PeerConnectionError::DisconnectReceived(DisconnectReason::AlreadyConnected)
976        | PeerConnectionError::DisconnectSent(DisconnectReason::AlreadyConnected) => {
977            debug!(
978                peer=%state.node,
979                %error_text,
980                %error,
981                "Peer already connected, don't replace it"
982            );
983        }
984        _ => {
985            debug!(
986                peer=%state.node,
987                %error_text,
988                %error,
989                remote_public_key=%state.node.public_key,
990                "discarding peer",
991            );
992        }
993    }
994}
995
996fn match_disconnect_reason(error: &PeerConnectionError) -> Option<DisconnectReason> {
997    match error {
998        PeerConnectionError::DisconnectSent(reason) => Some(*reason),
999        PeerConnectionError::DisconnectReceived(reason) => Some(*reason),
1000        PeerConnectionError::RLPDecodeError(_) => Some(DisconnectReason::NetworkError),
1001        PeerConnectionError::TooManyPeers => Some(DisconnectReason::TooManyPeers),
1002        // TODO build a proper matching between error types and disconnection reasons
1003        _ => None,
1004    }
1005}
1006
1007async fn exchange_hello_messages<S>(
1008    state: &mut Established,
1009    stream: &mut S,
1010) -> Result<(), PeerConnectionError>
1011where
1012    S: Unpin + Stream<Item = Result<Message, PeerConnectionError>>,
1013{
1014    // This allow is because in l2 we mut the capabilities
1015    // to include the l2 cap
1016    #[allow(unused_mut)]
1017    let mut supported_capabilities: Vec<Capability> = [
1018        &SUPPORTED_ETH_CAPABILITIES[..],
1019        &SUPPORTED_SNAP_CAPABILITIES[..],
1020    ]
1021    .concat();
1022    #[cfg(feature = "l2")]
1023    if state.l2_state.is_supported() {
1024        supported_capabilities.push(crate::rlpx::l2::SUPPORTED_BASED_CAPABILITIES[0].clone());
1025    }
1026    let hello_msg = Message::Hello(p2p::HelloMessage::new(
1027        supported_capabilities,
1028        PublicKey::from_secret_key(secp256k1::SECP256K1, &state.signer),
1029        state.client_version.clone(),
1030    ));
1031
1032    send(state, hello_msg).await?;
1033
1034    // Receive Hello message
1035    let msg = match receive(stream).await {
1036        Some(msg) => msg?,
1037        None => return Err(PeerConnectionError::Disconnected),
1038    };
1039
1040    match msg {
1041        Message::Hello(hello_message) => {
1042            let mut negotiated_eth_version = 0;
1043            let mut negotiated_snap_version = 0;
1044
1045            trace!(
1046                peer=%state.node,
1047                capabilities=?hello_message.capabilities,
1048                "Hello message capabilities",
1049            );
1050
1051            // Check if we have any capability in common and store the highest version
1052            for cap in &hello_message.capabilities {
1053                match cap.protocol() {
1054                    "eth" => {
1055                        if SUPPORTED_ETH_CAPABILITIES.contains(cap)
1056                            && cap.version > negotiated_eth_version
1057                        {
1058                            negotiated_eth_version = cap.version;
1059                        }
1060                    }
1061                    "snap" => {
1062                        if SUPPORTED_SNAP_CAPABILITIES.contains(cap)
1063                            && cap.version > negotiated_snap_version
1064                        {
1065                            negotiated_snap_version = cap.version;
1066                        }
1067                    }
1068                    #[cfg(feature = "l2")]
1069                    "based" if state.l2_state.is_supported() => {
1070                        state.l2_state.set_established()?;
1071                    }
1072                    _ => {}
1073                }
1074            }
1075
1076            state.capabilities = hello_message.capabilities;
1077
1078            if negotiated_eth_version == 0 {
1079                return Err(PeerConnectionError::NoMatchingCapabilities);
1080            }
1081            debug!("Negotiated eth version: eth/{}", negotiated_eth_version);
1082            state.negotiated_eth_capability = Some(Capability::eth(negotiated_eth_version));
1083
1084            if negotiated_snap_version != 0 {
1085                debug!("Negotiated snap version: snap/{}", negotiated_snap_version);
1086                state.negotiated_snap_capability = Some(Capability::snap(negotiated_snap_version));
1087            }
1088
1089            state.node.version = Some(hello_message.client_id);
1090
1091            Ok(())
1092        }
1093        Message::Disconnect(disconnect) => {
1094            Err(PeerConnectionError::DisconnectReceived(disconnect.reason()))
1095        }
1096        _ => {
1097            // Fail if it is not a hello message
1098            Err(PeerConnectionError::BadRequest(
1099                "Expected Hello message".to_string(),
1100            ))
1101        }
1102    }
1103}
1104
1105pub(crate) async fn send(
1106    state: &mut Established,
1107    message: Message,
1108) -> Result<(), PeerConnectionError> {
1109    #[cfg(feature = "metrics")]
1110    {
1111        use ethrex_metrics::p2p::METRICS_P2P;
1112        METRICS_P2P.inc_outgoing_message(message.metric_label());
1113    }
1114    state.sink.send(message).await
1115}
1116
1117/// Reads from the frame until a frame is available.
1118///
1119/// Returns `None` when the stream buffer is 0. This could indicate that the client has disconnected,
1120/// but we cannot safely assume an EOF, as per the Tokio documentation.
1121///
1122/// If the handshake has not been established, it is reasonable to terminate the connection.
1123///
1124/// For an established connection, [`check_periodic_task`] will detect actual disconnections
1125/// while sending pings and you should not assume a disconnection.
1126///
1127/// See [`Framed::new`] for more details.
1128async fn receive<S>(stream: &mut S) -> Option<Result<Message, PeerConnectionError>>
1129where
1130    S: Unpin + Stream<Item = Result<Message, PeerConnectionError>>,
1131{
1132    stream.next().await
1133}
1134
1135/// Returns true if the peer is within its rate limit for data-serving requests, false if exceeded.
1136/// Resets the window counter when the window duration has elapsed.
1137fn check_serve_request_rate(state: &mut Established) -> bool {
1138    let now = Instant::now();
1139    if now.duration_since(state.serve_request_window_start) >= SERVE_REQUEST_WINDOW {
1140        state.serve_request_window_start = now;
1141        state.serve_requests_in_window = 0;
1142    }
1143    state.serve_requests_in_window += 1;
1144    state.serve_requests_in_window <= MAX_SERVE_REQUESTS_PER_WINDOW
1145}
1146
1147async fn handle_incoming_message(
1148    state: &mut Established,
1149    message: Message,
1150) -> Result<(), PeerConnectionError> {
1151    #[cfg(feature = "metrics")]
1152    {
1153        use ethrex_metrics::p2p::METRICS_P2P;
1154        METRICS_P2P.inc_incoming_message(message.metric_label());
1155    }
1156
1157    // Rate-limit incoming data-serving requests to prevent resource exhaustion.
1158    let is_data_request = matches!(
1159        message,
1160        Message::GetBlockHeaders(_)
1161            | Message::GetBlockBodies(_)
1162            | Message::GetReceipts68(_)
1163            | Message::GetReceipts69(_)
1164            | Message::GetReceipts70(_)
1165            | Message::GetPooledTransactions(_)
1166            | Message::GetAccountRange(_)
1167            | Message::GetStorageRanges(_)
1168            | Message::GetByteCodes(_)
1169            | Message::GetTrieNodes(_)
1170    );
1171    if is_data_request && !check_serve_request_rate(state) {
1172        debug!(
1173            peer = %state.node,
1174            window_requests = state.serve_requests_in_window,
1175            "Disconnecting peer: exceeded incoming request rate limit",
1176        );
1177        send_disconnect_message(state, Some(DisconnectReason::UselessPeer)).await;
1178        return Err(PeerConnectionError::DisconnectSent(
1179            DisconnectReason::UselessPeer,
1180        ));
1181    }
1182
1183    let peer_supports_eth = state.negotiated_eth_capability.is_some();
1184    #[cfg(feature = "l2")]
1185    let peer_supports_l2 = state.l2_state.connection_state().is_ok();
1186    match message {
1187        Message::Disconnect(msg_data) => {
1188            let reason = msg_data.reason();
1189            trace!(
1190                peer=%state.node,
1191                ?reason,
1192                "Received Disconnect"
1193            );
1194            state.disconnect_reason = Some(reason);
1195
1196            // TODO handle the disconnection request
1197
1198            return Err(PeerConnectionError::DisconnectReceived(reason));
1199        }
1200        Message::Ping(_) => {
1201            trace!(peer=%state.node, "Sending pong message");
1202            send(state, Message::Pong(PongMessage {})).await?;
1203        }
1204        Message::Pong(_) => {
1205            // We ignore received Pong messages
1206        }
1207        Message::Status68(msg_data) => {
1208            if let Some(eth) = &state.negotiated_eth_capability {
1209                backend::validate_status(msg_data, &state.storage, eth).await?
1210            };
1211        }
1212        Message::Status69(msg_data) => {
1213            if let Some(eth) = &state.negotiated_eth_capability {
1214                backend::validate_status(msg_data, &state.storage, eth).await?
1215            };
1216        }
1217        Message::Status70(msg_data) => {
1218            if let Some(eth) = &state.negotiated_eth_capability {
1219                backend::validate_status(msg_data, &state.storage, eth).await?
1220            };
1221        }
1222        Message::Status71(msg_data) => {
1223            if let Some(eth) = &state.negotiated_eth_capability {
1224                backend::validate_status(msg_data, &state.storage, eth).await?
1225            };
1226        }
1227        Message::GetAccountRange(req) => {
1228            let response = process_account_range_request(req, state.storage.clone()).await?;
1229            send(state, Message::AccountRange(response)).await?
1230        }
1231        Message::Transactions(txs) if peer_supports_eth => {
1232            // https://github.com/ethereum/devp2p/blob/master/caps/eth.md#transactions-0x02
1233            if !txs.transactions.is_empty() {
1234                state.received_txs_from_peer = true;
1235            }
1236            if state.blockchain.is_synced() {
1237                let tx_hashes: Vec<_> = txs.transactions.iter().map(|tx| tx.hash()).collect();
1238
1239                // Offload pool insertion to a background task so we don't block
1240                // the ConnectionServer (validation + signature recovery are expensive).
1241                let blockchain = state.blockchain.clone();
1242                let peer = state.node.to_string();
1243                #[cfg(feature = "l2")]
1244                let is_l2_mode = state.l2_state.is_supported();
1245                tokio::spawn(async move {
1246                    for tx in txs.transactions {
1247                        #[cfg(feature = "l2")]
1248                        if (is_l2_mode && matches!(tx, Transaction::EIP4844Transaction(_)))
1249                            || tx.is_privileged()
1250                        {
1251                            let tx_type = tx.tx_type();
1252                            debug!(peer=%peer, "Rejecting transaction in L2 mode - {tx_type} transactions are not broadcasted in L2");
1253                            continue;
1254                        }
1255
1256                        if let Err(e) = blockchain.add_transaction_to_pool(tx).await {
1257                            debug!(
1258                                peer=%peer,
1259                                error=%e,
1260                                "Error adding transaction"
1261                            );
1262                        }
1263                    }
1264                });
1265
1266                // Notify the broadcaster immediately — it only tracks hashes
1267                // to avoid re-broadcasting to the sender. The actual broadcast
1268                // happens on a periodic timer that queries the mempool directly.
1269                state
1270                    .tx_broadcaster
1271                    .add_txs(tx_hashes, state.node.node_id())
1272                    .map_err(|e| PeerConnectionError::BroadcastError(e.to_string()))?;
1273            }
1274        }
1275        Message::GetBlockHeaders(msg_data) if peer_supports_eth => {
1276            let response = BlockHeaders {
1277                id: msg_data.id,
1278                block_headers: msg_data.fetch_headers(&state.storage).await,
1279            };
1280            send(state, Message::BlockHeaders(response)).await?;
1281        }
1282        Message::GetBlockBodies(msg_data) if peer_supports_eth => {
1283            let response = BlockBodies {
1284                id: msg_data.id,
1285                block_bodies: msg_data.fetch_blocks(&state.storage).await,
1286            };
1287            send(state, Message::BlockBodies(response)).await?;
1288        }
1289        Message::GetBlockAccessLists(GetBlockAccessLists { id, block_hashes })
1290            if peer_supports_eth =>
1291        {
1292            use crate::rlpx::eth::block_access_lists::BLOCK_ACCESS_LIST_LIMIT;
1293            let mut block_access_lists =
1294                Vec::with_capacity(block_hashes.len().min(BLOCK_ACCESS_LIST_LIMIT));
1295            for hash in &block_hashes {
1296                // EIP-8159: only serve a BAL that matches the block header's
1297                // commitment. A stored BAL that doesn't hash to the header's
1298                // `block_access_list_hash` (e.g. a stale/empty entry from a prior
1299                // regeneration) must be reported as unavailable (`0x80`) rather
1300                // than served as a wrong BAL, which receiving peers would reject.
1301                let bal = match state.storage.get_block_access_list(*hash) {
1302                    Ok(Some(bal)) => {
1303                        let commitment = match state.storage.get_block_header_by_hash(*hash) {
1304                            Ok(Some(header)) => header.block_access_list_hash,
1305                            Ok(None) => None,
1306                            Err(err) => {
1307                                // Don't serve an unverified BAL: degrade to 0x80
1308                                // (unavailable), but log so an operator can tell a
1309                                // committed BAL was refused due to a DB error.
1310                                warn!(
1311                                    "Failed to read header for BAL commitment check (hash {hash:#x}): {err}; reporting BAL unavailable"
1312                                );
1313                                None
1314                            }
1315                        };
1316                        bal.matches_commitment(commitment).then_some(bal)
1317                    }
1318                    Ok(None) => None,
1319                    Err(err) => {
1320                        error!("Error accessing DB while building BAL response for peer: {err}");
1321                        None
1322                    }
1323                };
1324                block_access_lists.push(bal);
1325                if block_access_lists.len() >= BLOCK_ACCESS_LIST_LIMIT {
1326                    break;
1327                }
1328            }
1329            let response = BlockAccessLists::new(id, block_access_lists);
1330            send(state, Message::BlockAccessLists(response)).await?;
1331        }
1332        Message::GetReceipts68(GetReceipts68 { id, block_hashes }) if peer_supports_eth => {
1333            let mut receipts = Vec::new();
1334            for hash in block_hashes.iter() {
1335                receipts.push(state.storage.get_receipts_for_block(hash).await?);
1336            }
1337            send(state, Message::Receipts68(Receipts68::new(id, receipts))).await?;
1338        }
1339        Message::GetReceipts69(GetReceipts68 { id, block_hashes }) if peer_supports_eth => {
1340            let mut receipts = Vec::new();
1341            for hash in block_hashes.iter() {
1342                receipts.push(state.storage.get_receipts_for_block(hash).await?);
1343            }
1344            send(state, Message::Receipts69(Receipts69::new(id, receipts))).await?;
1345        }
1346        // EIP-7975: eth/70 partial receipt requests
1347        Message::GetReceipts70(GetReceipts70 {
1348            id,
1349            first_block_receipt_index,
1350            block_hashes,
1351        }) if peer_supports_eth => {
1352            let block_hashes = &block_hashes[..block_hashes.len().min(256)];
1353            let mut all_receipts: Vec<Vec<Receipt>> = Vec::new();
1354            let mut total_size: usize = 0;
1355            let mut last_block_incomplete = false;
1356
1357            for (i, hash) in block_hashes.iter().enumerate() {
1358                let start_index = if i == 0 { first_block_receipt_index } else { 0 };
1359                let block_receipts = state
1360                    .storage
1361                    .get_receipts_for_block_from_index(hash, start_index, None)
1362                    .await?;
1363
1364                let mut block_receipt_list = Vec::new();
1365                let mut hit_limit = false;
1366                for receipt in block_receipts {
1367                    let receipt_size = receipt.length();
1368                    if total_size + receipt_size > SOFT_RESPONSE_LIMIT
1369                        && (!block_receipt_list.is_empty() || !all_receipts.is_empty())
1370                    {
1371                        hit_limit = true;
1372                        // Only mark incomplete when the current block actually
1373                        // has a partial receipt list. When the limit is hit
1374                        // before any receipt from this block fits, the previous
1375                        // block is complete — setting the flag would cause the
1376                        // peer to re-request an already-complete block.
1377                        if !block_receipt_list.is_empty() {
1378                            last_block_incomplete = true;
1379                        }
1380                        break;
1381                    }
1382                    total_size += receipt_size;
1383                    block_receipt_list.push(receipt);
1384                }
1385
1386                // Don't push an empty list when the limit was hit before any
1387                // receipt from this block could be included — an empty trailing
1388                // list would mislead the peer into thinking the block has no
1389                // transactions.
1390                if !block_receipt_list.is_empty() || !hit_limit {
1391                    all_receipts.push(block_receipt_list);
1392                }
1393
1394                if hit_limit {
1395                    break;
1396                }
1397            }
1398
1399            let response =
1400                Message::Receipts70(Receipts70::new(id, last_block_incomplete, all_receipts));
1401            send(state, response).await?;
1402        }
1403        Message::BlockRangeUpdate(update) => {
1404            trace!(
1405                peer=%state.node,
1406                range_from=update.earliest_block,
1407                range_to=update.latest_block,
1408                "Block range update",
1409            );
1410            // We will only validate the incoming update, we may decide to store and use this information in the future
1411            if let Err(err) = update.validate() {
1412                debug!(
1413                    peer=%state.node,
1414                    reason=%err,
1415                    "Disconnecting peer: invalid block range update",
1416                );
1417                send_disconnect_message(state, Some(DisconnectReason::SubprotocolError)).await;
1418                return Err(PeerConnectionError::DisconnectSent(
1419                    DisconnectReason::SubprotocolError,
1420                ));
1421            }
1422        }
1423        Message::NewPooledTransactionHashes(new_pooled_transaction_hashes) if peer_supports_eth => {
1424            // Don't request transactions if we're not synced — we won't be building blocks soon.
1425            if state.blockchain.is_synced() {
1426                let hashes = new_pooled_transaction_hashes
1427                    .get_transactions_to_request(&state.blockchain, state.node.node_id())?;
1428                if !hashes.is_empty() {
1429                    // Buffer hashes for batched requesting instead of sending immediately.
1430                    // The periodic flush_pending_tx_requests handler will send them.
1431                    state
1432                        .pending_tx_requests
1433                        .push((new_pooled_transaction_hashes, hashes));
1434                }
1435            }
1436        }
1437        Message::GetPooledTransactions(msg) => {
1438            let response = msg.handle(&state.blockchain)?;
1439            let batch_size = response.pooled_transactions.len() as u64;
1440            // Leech detection: disconnect peers that drain transactions but never contribute any.
1441            if state.txs_sent_to_peer + batch_size > LEECH_TX_SENT_THRESHOLD
1442                && !state.received_txs_from_peer
1443            {
1444                debug!(
1445                    peer = %state.node,
1446                    txs_sent = state.txs_sent_to_peer,
1447                    "Disconnecting peer: leech detected (sent many txs but received none)",
1448                );
1449                send_disconnect_message(state, Some(DisconnectReason::UselessPeer)).await;
1450                return Err(PeerConnectionError::DisconnectSent(
1451                    DisconnectReason::UselessPeer,
1452                ));
1453            }
1454            send(state, Message::PooledTransactions(response)).await?;
1455            state.txs_sent_to_peer += batch_size;
1456        }
1457        Message::PooledTransactions(msg) if peer_supports_eth => {
1458            if !msg.pooled_transactions.is_empty() {
1459                state.received_txs_from_peer = true;
1460            }
1461            // Always clear in-flight tracking for this response, regardless of sync status,
1462            // so other connections can re-request these hashes if needed.
1463            let removed_request = state.requested_pooled_txs.remove(&msg.id);
1464            if let Some((_, ref requested_hashes, _)) = removed_request {
1465                state
1466                    .blockchain
1467                    .mempool
1468                    .clear_in_flight_txs(requested_hashes)?;
1469            }
1470            // If we receive a blob transaction without blobs or with blobs that don't match the versioned hashes we must disconnect from the peer
1471            for tx in &msg.pooled_transactions {
1472                if let P2PTransaction::EIP4844TransactionWithBlobs(itx) = tx
1473                    && (itx.blobs_bundle.is_empty()
1474                        || itx
1475                            .blobs_bundle
1476                            .validate_blob_commitment_hashes(&itx.tx.blob_versioned_hashes)
1477                            .is_err())
1478                {
1479                    debug!(
1480                        peer=%state.node,
1481                        "Disconnecting peer: invalid or missing blobs",
1482                    );
1483                    if let Some((_announced, requested_hashes, _)) = &removed_request {
1484                        retry_on_alternates(&state.blockchain, &state.peer_table, requested_hashes)
1485                            .await;
1486                    }
1487                    send_disconnect_message(state, Some(DisconnectReason::SubprotocolError)).await;
1488                    return Err(PeerConnectionError::DisconnectSent(
1489                        DisconnectReason::SubprotocolError,
1490                    ));
1491                }
1492            }
1493            if state.blockchain.is_synced() {
1494                if let Some((announced, requested_hashes, _)) = &removed_request {
1495                    let fork = state.blockchain.current_fork().await?;
1496                    if let Err(error) = msg.validate_requested(announced, fork) {
1497                        debug!(
1498                            peer=%state.node,
1499                            reason=%error,
1500                            "Disconnecting peer: invalid pooled transactions response",
1501                        );
1502                        retry_on_alternates(&state.blockchain, &state.peer_table, requested_hashes)
1503                            .await;
1504                        send_disconnect_message(state, Some(DisconnectReason::SubprotocolError))
1505                            .await;
1506                        return Err(PeerConnectionError::DisconnectSent(
1507                            DisconnectReason::SubprotocolError,
1508                        ));
1509                    }
1510                }
1511                #[cfg(feature = "l2")]
1512                let is_l2_mode = state.l2_state.is_supported();
1513
1514                #[cfg(not(feature = "l2"))]
1515                let is_l2_mode = false;
1516                if let Err(error) = msg.handle(&state.node, &state.blockchain, is_l2_mode).await {
1517                    if matches!(
1518                        error,
1519                        ethrex_blockchain::error::MempoolError::BlobsBundleError(_)
1520                    ) {
1521                        debug!(
1522                            peer=%state.node,
1523                            reason=%error,
1524                            "Disconnecting peer: invalid pooled transactions response",
1525                        );
1526                        if let Some((_announced, requested_hashes, _)) = &removed_request {
1527                            retry_on_alternates(
1528                                &state.blockchain,
1529                                &state.peer_table,
1530                                requested_hashes,
1531                            )
1532                            .await;
1533                        }
1534                        send_disconnect_message(state, Some(DisconnectReason::SubprotocolError))
1535                            .await;
1536                        return Err(PeerConnectionError::DisconnectSent(
1537                            DisconnectReason::SubprotocolError,
1538                        ));
1539                    }
1540                    return Err(error.into());
1541                }
1542            }
1543        }
1544        Message::GetStorageRanges(req) => {
1545            let response = process_storage_ranges_request(req, state.storage.clone()).await?;
1546            send(state, Message::StorageRanges(response)).await?
1547        }
1548        Message::GetByteCodes(req) => {
1549            let storage_clone = state.storage.clone();
1550            let response = process_byte_codes_request(req, storage_clone)
1551                .await
1552                .map_err(|_| {
1553                    PeerConnectionError::InternalError(
1554                        "Failed to execute bytecode retrieval task".to_string(),
1555                    )
1556                })?;
1557            send(state, Message::ByteCodes(response)).await?
1558        }
1559        Message::GetTrieNodes(req) => {
1560            let id = req.id;
1561            match process_trie_nodes_request(req, state.storage.clone()).await {
1562                Ok(response) => send(state, Message::TrieNodes(response)).await?,
1563                Err(_) => send(state, Message::TrieNodes(TrieNodes { id, nodes: vec![] })).await?,
1564            }
1565        }
1566        #[cfg(feature = "l2")]
1567        Message::L2(req) if peer_supports_l2 => {
1568            handle_based_capability_message(state, req).await?;
1569        }
1570        // Send response messages to the backend
1571        message @ Message::AccountRange(_)
1572        | message @ Message::StorageRanges(_)
1573        | message @ Message::ByteCodes(_)
1574        | message @ Message::TrieNodes(_)
1575        | message @ Message::BlockBodies(_)
1576        | message @ Message::BlockHeaders(_)
1577        | message @ Message::Receipts68(_)
1578        | message @ Message::Receipts69(_)
1579        | message @ Message::Receipts70(_)
1580        | message @ Message::BlockAccessLists(_) => {
1581            if let Some((_, tx)) = message
1582                .request_id()
1583                .and_then(|id| state.current_requests.remove(&id))
1584            {
1585                tx.send(message)
1586                    .map_err(|e| PeerConnectionError::SendMessage(e.to_string()))?
1587            } else {
1588                return Err(PeerConnectionError::ExpectedRequestId(format!("{message}")));
1589            }
1590        }
1591        // TODO: Add new message types and handlers as they are implemented
1592        message => return Err(PeerConnectionError::MessageNotHandled(format!("{message}"))),
1593    };
1594    Ok(())
1595}
1596
1597async fn handle_outgoing_message(
1598    state: &mut Established,
1599    message: Message,
1600) -> Result<(), PeerConnectionError> {
1601    trace!(
1602        peer=%state.node,
1603        %message,
1604        "Sending message"
1605    );
1606    send(state, message).await?;
1607    Ok(())
1608}
1609
1610async fn handle_outgoing_request(
1611    state: &mut Established,
1612    message: Message,
1613    sender: oneshot::Sender<Message>,
1614) -> Result<(), PeerConnectionError> {
1615    // Insert the request in the request map if it supports a request id.
1616    message.request_id().and_then(|id| {
1617        state
1618            .current_requests
1619            .insert(id, (format!("{message}"), sender))
1620    });
1621    trace!(
1622        peer=%state.node,
1623        %message,
1624        "Sending request"
1625    );
1626    send(state, message).await?;
1627    Ok(())
1628}
1629
1630async fn handle_broadcast(
1631    state: &mut Established,
1632    (id, broadcasted_msg): (task::Id, Arc<Message>),
1633) -> Result<(), PeerConnectionError> {
1634    if id != tokio::task::id() {
1635        match broadcasted_msg.as_ref() {
1636            #[cfg(feature = "l2")]
1637            l2_msg @ Message::L2(_) => {
1638                handle_l2_broadcast(state, l2_msg).await?;
1639            }
1640            msg => {
1641                error!(
1642                    peer=%state.node,
1643                    message=%msg,
1644                    "Non-supported message broadcasted"
1645                );
1646                let error_message = format!("Non-supported message broadcasted: {msg}");
1647                return Err(PeerConnectionError::BroadcastError(error_message));
1648            }
1649        }
1650    }
1651    Ok(())
1652}
1653
1654async fn handle_block_range_update(state: &mut Established) -> Result<(), PeerConnectionError> {
1655    if should_send_block_range_update(state).await? {
1656        send_block_range_update(state).await
1657    } else {
1658        Ok(())
1659    }
1660}
1661
1662/// Drains the pending transaction request buffer and sends batched
1663/// GetPooledTransactions requests, respecting the 256-hash-per-request
1664/// limit from the devp2p ETH spec.
1665async fn flush_pending_tx_requests(state: &mut Established) -> Result<(), PeerConnectionError> {
1666    if state.pending_tx_requests.is_empty() {
1667        return Ok(());
1668    }
1669
1670    let pending = std::mem::take(&mut state.pending_tx_requests);
1671
1672    // Build a trimmed announcement containing only the hashes we're actually requesting,
1673    // with their original types and sizes for response validation.
1674    let mut all_hashes: Vec<H256> = Vec::new();
1675    let mut all_types: Vec<u8> = Vec::new();
1676    let mut all_sizes: Vec<usize> = Vec::new();
1677
1678    for (announcement, hashes) in &pending {
1679        let trimmed = announcement.filter_to(hashes);
1680        all_hashes.extend_from_slice(&trimmed.transaction_hashes);
1681        all_types.extend_from_slice(&trimmed.transaction_types);
1682        all_sizes.extend(trimmed.transaction_sizes);
1683    }
1684
1685    // Send in chunks of MAX_HASHES_PER_REQUEST per the devp2p spec.
1686    const MAX_HASHES_PER_REQUEST: usize = 256;
1687    for (i, chunk) in all_hashes.chunks(MAX_HASHES_PER_REQUEST).enumerate() {
1688        let offset = i * MAX_HASHES_PER_REQUEST;
1689        let chunk_types = &all_types[offset..offset + chunk.len()];
1690        let chunk_sizes = &all_sizes[offset..offset + chunk.len()];
1691
1692        let announcement = NewPooledTransactionHashes::from_raw(
1693            chunk_types.to_vec().into(),
1694            chunk_sizes.to_vec(),
1695            chunk.to_vec(),
1696        );
1697        let request = GetPooledTransactions::new(random(), chunk.to_vec());
1698        let request_id = request.id;
1699        // Send first, only register in requested_pooled_txs on success.
1700        // This ensures we never track hashes for messages that were not transmitted.
1701        if let Err(e) = send(state, Message::GetPooledTransactions(request)).await {
1702            // Clear in-flight for the current chunk (failed to send) and all remaining chunks,
1703            // then try alternate announcers. Order matters: clear first so the alternate's
1704            // reserve_unknown_hashes sees the hashes as free.
1705            // Build an announcement covering every unsent hash (later chunks too) so the
1706            // alternate can validate its response against the original type/size metadata.
1707            let unsent = &all_hashes[offset..];
1708            if !unsent.is_empty() {
1709                if let Err(clear_err) = state.blockchain.mempool.clear_in_flight_txs(unsent) {
1710                    warn!(error = %clear_err, "Failed to clear in-flight transaction tracking after send error");
1711                }
1712                retry_on_alternates(&state.blockchain, &state.peer_table, unsent).await;
1713            }
1714            return Err(e);
1715        }
1716        state
1717            .requested_pooled_txs
1718            .insert(request_id, (announcement, chunk.to_vec(), Instant::now()));
1719    }
1720
1721    Ok(())
1722}
1723
1724/// For each hash that has a remaining alternate announcer, look up that
1725/// peer's connection and enqueue the request there. Each alternate carries
1726/// the (type, size) metadata it originally announced, so the retry request
1727/// is built from the alternate's own announcement rather than the failing
1728/// peer's; otherwise validation against the failing peer's sizes would
1729/// reject the alternate's response when the two announcements differ (e.g.
1730/// bare blob tx vs full sidecar).
1731///
1732/// If a popped alternate is no longer reachable, keep popping until a live
1733/// peer is found or alternates for that hash are exhausted, so a disconnected
1734/// alternate doesn't burn the only fallback slot.
1735async fn retry_on_alternates(
1736    blockchain: &Arc<Blockchain>,
1737    peer_table: &PeerTable,
1738    hashes: &[H256],
1739) {
1740    if hashes.is_empty() {
1741        return;
1742    }
1743    // Group hashes by chosen live alternate, carrying their own type/size.
1744    // We walk per-hash so a dead alternate for hash X doesn't consume the
1745    // slot that hash Y could use. The `PeerConnection` handle from the
1746    // liveness probe is stashed in `by_peer` and reused at enqueue time,
1747    // so there's no second lookup (and no race where the connection drops
1748    // between probe and use).
1749    type AltGroup = (PeerConnection, Vec<(H256, u8, usize)>);
1750    let mut by_peer: FxHashMap<H256, AltGroup> = FxHashMap::default();
1751    for hash in hashes {
1752        loop {
1753            let alt = match blockchain.mempool.pop_alternate(*hash) {
1754                Ok(Some(a)) => a,
1755                Ok(None) => break,
1756                Err(e) => {
1757                    warn!(error = %e, "pop_alternate failed");
1758                    break;
1759                }
1760            };
1761            // Reuse the connection we already grabbed for this peer.
1762            if let Some((_, list)) = by_peer.get_mut(&alt.peer_id) {
1763                list.push((*hash, alt.tx_type, alt.tx_size));
1764                break;
1765            }
1766            match peer_table.get_peer_connection(alt.peer_id).await {
1767                Ok(Some(conn)) => {
1768                    by_peer.insert(alt.peer_id, (conn, vec![(*hash, alt.tx_type, alt.tx_size)]));
1769                    break;
1770                }
1771                Ok(None) => continue, // dead peer, try next alternate
1772                Err(e) => {
1773                    warn!(error = %e, "get_peer_connection failed");
1774                    break;
1775                }
1776            }
1777        }
1778    }
1779
1780    for (_, (conn, entries)) in by_peer {
1781        let mut types = Vec::with_capacity(entries.len());
1782        let mut sizes = Vec::with_capacity(entries.len());
1783        let mut hash_list = Vec::with_capacity(entries.len());
1784        for (h, t, s) in &entries {
1785            hash_list.push(*h);
1786            types.push(*t);
1787            sizes.push(*s);
1788        }
1789        let announcement =
1790            NewPooledTransactionHashes::from_raw(types.into(), sizes, hash_list.clone());
1791        if let Err(e) = conn.enqueue_tx_requests(announcement, hash_list) {
1792            debug!(error = %e, "Failed to enqueue tx requests on alternate peer");
1793        }
1794    }
1795}