#[cfg(feature = "l2")]
use crate::rlpx::l2::{
PERIODIC_BATCH_BROADCAST_INTERVAL, PERIODIC_BLOCK_BROADCAST_INTERVAL,
l2_connection::{
self, L2Cast, L2ConnState, handle_based_capability_message, handle_l2_broadcast,
},
};
use crate::{
backend,
metrics::METRICS,
network::P2PContext,
peer_table::{PeerTable, PeerTableServerProtocol as _},
rlpx::{
Message,
connection::{codec::RLPxCodec, handshake},
error::PeerConnectionError,
eth::{
block_access_lists::{BlockAccessLists, GetBlockAccessLists},
blocks::{BlockBodies, BlockHeaders},
receipts::{
GetReceipts68, GetReceipts70, Receipts68, Receipts69, Receipts70,
SOFT_RESPONSE_LIMIT,
},
status::{StatusMessage68, StatusMessage69, StatusMessage70, StatusMessage71},
transactions::{GetPooledTransactions, NewPooledTransactionHashes},
update::BlockRangeUpdate,
},
message::EthCapVersion,
p2p::{
self, Capability, DisconnectMessage, DisconnectReason, PingMessage, PongMessage,
SUPPORTED_ETH_CAPABILITIES, SUPPORTED_SNAP_CAPABILITIES,
},
snap::TrieNodes,
},
snap::{
process_account_range_request, process_byte_codes_request, process_storage_ranges_request,
process_trie_nodes_request,
},
tx_broadcaster::{TxBroadcaster, TxBroadcasterProtocol as _, send_tx_hashes},
types::Node,
};
use ethrex_blockchain::Blockchain;
use ethrex_common::H256;
#[cfg(feature = "l2")]
use ethrex_common::types::Transaction;
use ethrex_common::types::{MempoolTransaction, P2PTransaction, Receipt};
use ethrex_rlp::encode::RLPEncode;
use ethrex_storage::{Store, error::StoreError};
use ethrex_trie::TrieError;
use futures::{SinkExt as _, Stream, stream::SplitSink};
use rand::random;
use rustc_hash::FxHashMap;
use secp256k1::{PublicKey, SecretKey};
use spawned_concurrency::{
actor,
error::ActorError,
protocol,
tasks::{Actor, ActorRef, ActorStart as _, Context, Handler, send_interval, spawn_listener},
};
use spawned_rt::tasks::BroadcastStream;
use std::{
collections::HashMap,
net::SocketAddr,
sync::{Arc, RwLock},
time::{Duration, Instant},
};
use tokio::{
net::TcpStream,
sync::{broadcast, oneshot},
task::{self, Id},
};
use tokio_stream::StreamExt;
use tokio_util::codec::Framed;
use tracing::{debug, error, trace, warn};
const PING_INTERVAL: Duration = Duration::from_secs(10);
const BLOCK_RANGE_UPDATE_INTERVAL: Duration = Duration::from_secs(60);
const INFLIGHT_TX_SWEEP_INTERVAL: Duration = Duration::from_secs(15);
const INFLIGHT_TX_TIMEOUT: Duration = Duration::from_secs(30);
const TX_REQUEST_BATCH_INTERVAL: Duration = Duration::from_millis(50);
const SERVE_REQUEST_WINDOW: Duration = Duration::from_secs(60);
const MAX_SERVE_REQUESTS_PER_WINDOW: u64 = 500;
const LEECH_TX_SENT_THRESHOLD: u64 = 10_000;
pub(crate) type PeerConnBroadcastSender = broadcast::Sender<(tokio::task::Id, Arc<Message>)>;
#[protocol]
pub trait PeerConnectionServerProtocol: Send + Sync {
fn incoming_message(&self, message: Message) -> Result<(), ActorError>;
fn outgoing_message(&self, message: Message) -> Result<(), ActorError>;
fn outgoing_request(
&self,
message: Message,
sender: Arc<oneshot::Sender<Message>>,
) -> Result<(), ActorError>;
fn request_timeout(&self, id: u64) -> Result<(), ActorError>;
fn send_ping(&self) -> Result<(), ActorError>;
fn block_range_update(&self) -> Result<(), ActorError>;
fn broadcast_message(&self, task_id: Id, msg: Arc<Message>) -> Result<(), ActorError>;
fn sweep_inflight_txs(&self) -> Result<(), ActorError>;
fn flush_pending_tx_requests(&self) -> Result<(), ActorError>;
fn enqueue_tx_requests(
&self,
announcement: NewPooledTransactionHashes,
hashes: Vec<H256>,
) -> Result<(), ActorError>;
}
#[cfg(feature = "l2")]
#[derive(Clone)]
pub struct L2Message {
pub msg: L2Cast,
}
#[cfg(feature = "l2")]
impl spawned_concurrency::message::Message for L2Message {
type Result = ();
}
#[derive(Clone, Debug)]
pub struct PeerConnection {
handle: ActorRef<PeerConnectionServer>,
}
impl PeerConnection {
pub fn spawn_as_receiver(
context: P2PContext,
peer_addr: SocketAddr,
stream: TcpStream,
) -> PeerConnection {
let state = ConnectionState::Receiver(Receiver {
context,
peer_addr,
stream: Arc::new(stream),
});
let connection = PeerConnectionServer { state };
Self {
handle: connection.start(),
}
}
pub fn spawn_as_initiator(context: P2PContext, node: &Node) -> PeerConnection {
let state = ConnectionState::Initiator(Initiator {
context,
node: node.clone(),
});
let connection = PeerConnectionServer { state };
Self {
handle: connection.start(),
}
}
pub async fn outgoing_message(&mut self, message: Message) -> Result<(), PeerConnectionError> {
self.handle
.outgoing_message(message)
.map_err(|err| PeerConnectionError::InternalError(err.to_string()))
}
pub fn enqueue_tx_requests(
&self,
announcement: NewPooledTransactionHashes,
hashes: Vec<H256>,
) -> Result<(), PeerConnectionError> {
self.handle
.enqueue_tx_requests(announcement, hashes)
.map_err(|err| PeerConnectionError::InternalError(err.to_string()))
}
pub async fn outgoing_request(
&mut self,
message: Message,
timeout: Duration,
) -> Result<Message, PeerConnectionError> {
let id = message
.request_id()
.expect("Cannot wait on request without id");
let (oneshot_tx, oneshot_rx) = oneshot::channel::<Message>();
self.handle
.outgoing_request(message, Arc::new(oneshot_tx))
.map_err(|err| PeerConnectionError::InternalError(err.to_string()))?;
match tokio::time::timeout(timeout, oneshot_rx).await {
Ok(Ok(response)) => Ok(response),
Ok(Err(error)) => Err(PeerConnectionError::RecvError(error.to_string())),
Err(_timeout) => {
self.handle
.request_timeout(id)
.map_err(|err| PeerConnectionError::InternalError(err.to_string()))?;
Err(PeerConnectionError::Timeout)
}
}
}
}
#[derive(Debug)]
pub struct Initiator {
pub(crate) context: P2PContext,
pub(crate) node: Node,
}
#[derive(Debug)]
pub struct Receiver {
pub(crate) context: P2PContext,
pub(crate) peer_addr: SocketAddr,
pub(crate) stream: Arc<TcpStream>,
}
#[derive(Debug)]
pub struct Established {
pub(crate) signer: SecretKey,
pub(crate) sink: SplitSink<Framed<TcpStream, RLPxCodec>, Message>,
pub(crate) node: Node,
pub(crate) storage: Store,
pub(crate) blockchain: Arc<Blockchain>,
pub(crate) capabilities: Vec<Capability>,
pub(crate) negotiated_eth_capability: Option<Capability>,
pub(crate) negotiated_snap_capability: Option<Capability>,
pub(crate) last_block_range_update_block: u64,
pub(crate) requested_pooled_txs: HashMap<u64, (NewPooledTransactionHashes, Vec<H256>, Instant)>,
pub(crate) pending_tx_requests: Vec<(NewPooledTransactionHashes, Vec<H256>)>,
pub(crate) client_version: String,
pub(crate) connection_broadcast_send: PeerConnBroadcastSender,
pub(crate) peer_table: PeerTable,
#[cfg(feature = "l2")]
pub(crate) l2_state: L2ConnState,
pub(crate) tx_broadcaster: ActorRef<TxBroadcaster>,
pub(crate) current_requests: HashMap<u64, (String, oneshot::Sender<Message>)>,
pub(crate) disconnect_reason: Option<DisconnectReason>,
pub(crate) is_validated: bool,
pub(crate) serve_request_window_start: Instant,
pub(crate) serve_requests_in_window: u64,
pub(crate) txs_sent_to_peer: u64,
pub(crate) received_txs_from_peer: bool,
}
impl Established {
async fn teardown(&mut self) {
for (_, (_announced, requested_hashes, _)) in self.requested_pooled_txs.drain() {
if let Err(e) = self
.blockchain
.mempool
.clear_in_flight_txs(&requested_hashes)
{
warn!(error = %e, "Failed to clear in-flight transaction tracking during peer teardown");
}
retry_on_alternates(&self.blockchain, &self.peer_table, &requested_hashes).await;
}
for (_announced, pending_hashes) in self.pending_tx_requests.drain(..) {
if let Err(e) = self.blockchain.mempool.clear_in_flight_txs(&pending_hashes) {
warn!(error = %e, "Failed to clear in-flight transaction tracking during peer teardown");
}
retry_on_alternates(&self.blockchain, &self.peer_table, &pending_hashes).await;
}
let _ = self
.sink
.close()
.await
.inspect_err(|err| debug!("Could not close the socket: {err}"));
}
}
#[derive(Debug)]
pub enum ConnectionState {
HandshakeFailed,
Initiator(Initiator),
Receiver(Receiver),
Established(Box<Established>),
}
#[derive(Debug)]
pub struct PeerConnectionServer {
state: ConnectionState,
}
#[actor(protocol = PeerConnectionServerProtocol)]
impl PeerConnectionServer {
#[started]
async fn started(&mut self, ctx: &Context<Self>) {
let eth_version = Arc::new(RwLock::new(EthCapVersion::default()));
let state = std::mem::replace(&mut self.state, ConnectionState::HandshakeFailed);
match handshake::perform(state, eth_version.clone()).await {
Ok((mut established_state, stream)) => {
trace!(peer=%established_state.node, "Starting RLPx connection");
if let Err(reason) =
initialize_connection(ctx, &mut established_state, stream, eth_version).await
{
match &reason {
PeerConnectionError::NoMatchingCapabilities
| PeerConnectionError::HandshakeError(_) => {
if let Err(e) = established_state
.peer_table
.set_unwanted(established_state.node.node_id())
{
debug!("Failed to set peer as unwanted: {e}");
}
}
_ => {}
}
connection_failed(
&mut established_state,
"Failed to initialize RLPx connection",
&reason,
)
.await;
METRICS.record_new_rlpx_conn_failure(reason).await;
self.state = ConnectionState::Established(Box::new(established_state));
ctx.stop();
} else {
METRICS
.record_new_rlpx_conn_established(
&established_state
.node
.version
.clone()
.unwrap_or("Unknown".to_string()),
)
.await;
established_state.is_validated = true;
self.state = ConnectionState::Established(Box::new(established_state));
}
}
Err(err) => {
debug!("Failed Handshake on RLPx connection {err}");
self.state = ConnectionState::HandshakeFailed;
ctx.stop();
}
}
}
#[stopped]
async fn stopped(&mut self, _ctx: &Context<Self>) {
match std::mem::replace(&mut self.state, ConnectionState::HandshakeFailed) {
ConnectionState::Established(mut established_state) => {
trace!(peer=%established_state.node, "Closing connection with established peer");
if established_state.is_validated {
let reason = established_state
.disconnect_reason
.unwrap_or(DisconnectReason::NetworkError);
METRICS
.record_new_rlpx_conn_disconnection(
&established_state
.node
.version
.clone()
.unwrap_or("Unknown".to_string()),
reason,
)
.await;
}
if let Err(e) = established_state
.peer_table
.remove_peer(established_state.node.node_id())
{
debug!("Failed to remove peer from table: {e}");
}
established_state.teardown().await;
}
_ => {
}
};
}
#[send_handler]
async fn handle_incoming_message(
&mut self,
msg: peer_connection_server_protocol::IncomingMessage,
ctx: &Context<Self>,
) {
if let ConnectionState::Established(ref mut established_state) = self.state {
trace!(
peer=%established_state.node,
message=%msg.message,
"Received incoming message",
);
let result = handle_incoming_message(established_state, msg.message).await;
Self::process_cast_error(&self.state, result, ctx);
} else {
debug!("Connection not yet established");
}
}
#[send_handler]
async fn handle_outgoing_message(
&mut self,
msg: peer_connection_server_protocol::OutgoingMessage,
ctx: &Context<Self>,
) {
if let ConnectionState::Established(ref mut established_state) = self.state {
trace!(
peer=%established_state.node,
message=%msg.message,
"Received outgoing request",
);
let result = handle_outgoing_message(established_state, msg.message).await;
Self::process_cast_error(&self.state, result, ctx);
} else {
debug!("Connection not yet established");
}
}
#[send_handler]
async fn handle_outgoing_request(
&mut self,
msg: peer_connection_server_protocol::OutgoingRequest,
ctx: &Context<Self>,
) {
if let ConnectionState::Established(ref mut established_state) = self.state {
trace!(
peer=%established_state.node,
message=%msg.message,
"Received outgoing request",
);
let Some(sender) = Arc::<oneshot::Sender<Message>>::into_inner(msg.sender) else {
debug!("Could not obtain sender channel: Arc has multiple references");
return;
};
let result = handle_outgoing_request(established_state, msg.message, sender).await;
Self::process_cast_error(&self.state, result, ctx);
} else {
debug!("Connection not yet established");
}
}
#[send_handler]
async fn handle_request_timeout(
&mut self,
msg: peer_connection_server_protocol::RequestTimeout,
_ctx: &Context<Self>,
) {
if let ConnectionState::Established(ref mut established_state) = self.state {
if let Some((msg_type, _)) = established_state.current_requests.remove(&msg.id) {
debug!(
peer=%established_state.node,
%msg_type,
id=%msg.id,
"Request timedout",
);
}
} else {
debug!("Connection not yet established");
}
}
#[send_handler]
async fn handle_send_ping(
&mut self,
_msg: peer_connection_server_protocol::SendPing,
ctx: &Context<Self>,
) {
if let ConnectionState::Established(ref mut established_state) = self.state {
let result = send(established_state, Message::Ping(PingMessage {})).await;
Self::process_cast_error(&self.state, result, ctx);
} else {
debug!("Connection not yet established");
}
}
#[send_handler]
async fn handle_block_range_update(
&mut self,
_msg: peer_connection_server_protocol::BlockRangeUpdate,
ctx: &Context<Self>,
) {
if let ConnectionState::Established(ref mut established_state) = self.state {
trace!(
peer=%established_state.node,
"Block Range Update"
);
let result = handle_block_range_update(established_state).await;
Self::process_cast_error(&self.state, result, ctx);
} else {
debug!("Connection not yet established");
}
}
#[send_handler]
async fn handle_sweep_inflight_txs(
&mut self,
_msg: peer_connection_server_protocol::SweepInflightTxs,
_ctx: &Context<Self>,
) {
if let ConnectionState::Established(ref mut state) = self.state {
let now = Instant::now();
let stale_ids: Vec<u64> = state
.requested_pooled_txs
.iter()
.filter(|(_, (_, _, ts))| now.duration_since(*ts) > INFLIGHT_TX_TIMEOUT)
.map(|(id, _)| *id)
.collect();
for id in stale_ids {
if let Some((_announced, hashes, _)) = state.requested_pooled_txs.remove(&id) {
if let Err(e) = state.blockchain.mempool.clear_in_flight_txs(&hashes) {
warn!(error = %e, "Failed to clear in-flight transaction tracking while sweeping stale requests");
}
retry_on_alternates(&state.blockchain, &state.peer_table, &hashes).await;
}
}
}
}
#[send_handler]
async fn handle_flush_pending_tx_requests(
&mut self,
_msg: peer_connection_server_protocol::FlushPendingTxRequests,
ctx: &Context<Self>,
) {
if let ConnectionState::Established(ref mut established_state) = self.state {
let result = flush_pending_tx_requests(established_state).await;
Self::process_cast_error(&self.state, result, ctx);
}
}
#[send_handler]
async fn handle_enqueue_tx_requests(
&mut self,
msg: peer_connection_server_protocol::EnqueueTxRequests,
_ctx: &Context<Self>,
) {
if let ConnectionState::Established(ref mut state) = self.state {
let to_request: Vec<H256> = match state.blockchain.mempool.reserve_unknown_hashes(
&msg.announcement.transaction_hashes,
&msg.announcement.transaction_types,
&msg.announcement.transaction_sizes,
state.node.node_id(),
) {
Ok(unknown) => unknown,
Err(_) => return,
};
if to_request.is_empty() {
return;
}
let trimmed = msg.announcement.filter_to(&to_request);
state.pending_tx_requests.push((trimmed, to_request));
}
}
#[send_handler]
async fn handle_broadcast_message(
&mut self,
msg: peer_connection_server_protocol::BroadcastMessage,
ctx: &Context<Self>,
) {
if let ConnectionState::Established(ref mut established_state) = self.state {
trace!(
peer=%established_state.node,
message=%msg.msg,
"Received broadcasted message",
);
let result = handle_broadcast(established_state, (msg.task_id, msg.msg)).await;
Self::process_cast_error(&self.state, result, ctx);
} else {
debug!("Connection not yet established");
}
}
#[cfg(feature = "l2")]
#[send_handler]
async fn handle_l2_message(&mut self, msg: L2Message, ctx: &Context<Self>) {
if let ConnectionState::Established(ref mut established_state) = self.state {
let peer_supports_l2 = established_state.l2_state.connection_state().is_ok();
let result = if peer_supports_l2 {
trace!(
peer=%established_state.node,
message=?msg.msg,
"Handling cast for L2 msg"
);
match msg.msg {
L2Cast::BatchBroadcast => {
let res = l2_connection::send_sealed_batch(established_state).await;
res.and(l2_connection::process_batches_on_queue(established_state).await)
}
L2Cast::BlockBroadcast => {
let res = l2_connection::send_new_block(established_state).await;
res.and(l2_connection::process_blocks_on_queue(established_state).await)
}
}
} else {
Err(PeerConnectionError::MessageNotHandled(
"Unknown message or capability not handled".to_string(),
))
};
Self::process_cast_error(&self.state, result, ctx);
} else {
debug!("Connection not yet established");
}
}
fn process_cast_error(
state: &ConnectionState,
result: Result<(), PeerConnectionError>,
ctx: &Context<Self>,
) {
if let Err(e) = result
&& let ConnectionState::Established(established_state) = state
{
match e {
PeerConnectionError::Disconnected
| PeerConnectionError::DisconnectReceived(_)
| PeerConnectionError::DisconnectSent(_)
| PeerConnectionError::HandshakeError(_)
| PeerConnectionError::NoMatchingCapabilities
| PeerConnectionError::InvalidPeerId
| PeerConnectionError::InvalidMessageLength
| PeerConnectionError::StateError(_)
| PeerConnectionError::InvalidRecoveryId => {
trace!(peer=%established_state.node, error=e.to_string(), "Peer connection error");
ctx.stop();
}
PeerConnectionError::IoError(ref io_e)
if io_e.kind() == std::io::ErrorKind::BrokenPipe =>
{
debug!(peer=%established_state.node, "Broken pipe with peer, disconnected");
ctx.stop();
}
PeerConnectionError::StoreError(StoreError::Trie(TrieError::InconsistentTree(
_,
))) => {
if established_state.blockchain.is_synced() {
error!(
peer=%established_state.node,
error=%e,
"Inconsistent trie while serving peer request; local state may be corrupted",
);
} else {
trace!(
peer=%established_state.node,
error=%e,
"Error handling cast message",
);
}
}
_ => {
debug!(
peer=%established_state.node,
capabilities=?established_state.capabilities,
error=%e,
"Error handling cast message",
);
}
}
}
}
}
async fn initialize_connection<S>(
ctx: &Context<PeerConnectionServer>,
state: &mut Established,
mut stream: S,
eth_version: Arc<RwLock<EthCapVersion>>,
) -> Result<(), PeerConnectionError>
where
S: Unpin + Send + Stream<Item = Result<Message, PeerConnectionError>> + 'static,
{
if state.peer_table.target_peers_reached().await? {
debug!(peer=%state.node, "Reached target peer connections, discarding.");
return Err(PeerConnectionError::TooManyPeers);
}
exchange_hello_messages(state, &mut stream).await?;
let version = match &state.negotiated_eth_capability {
Some(cap) if cap == &Capability::eth(68) => EthCapVersion::V68,
Some(cap) if cap == &Capability::eth(69) => EthCapVersion::V69,
Some(cap) if cap == &Capability::eth(70) => EthCapVersion::V70,
Some(cap) if cap == &Capability::eth(71) => EthCapVersion::V71,
_ => EthCapVersion::default(),
};
*eth_version
.write()
.map_err(|err| PeerConnectionError::InternalError(err.to_string()))? = version;
init_capabilities(state, &mut stream).await?;
let mut connection = PeerConnection {
handle: ctx.actor_ref(),
};
state.peer_table.new_connected_peer(
state.node.clone(),
connection.clone(),
state.capabilities.clone(),
)?;
trace!(peer=%state.node, "Peer connection initialized.");
send_all_pooled_tx_hashes(state, &mut connection).await?;
send_interval(
PING_INTERVAL,
ctx.clone(),
peer_connection_server_protocol::SendPing,
);
send_interval(
BLOCK_RANGE_UPDATE_INTERVAL,
ctx.clone(),
peer_connection_server_protocol::BlockRangeUpdate,
);
send_interval(
INFLIGHT_TX_SWEEP_INTERVAL,
ctx.clone(),
peer_connection_server_protocol::SweepInflightTxs,
);
send_interval(
TX_REQUEST_BATCH_INTERVAL,
ctx.clone(),
peer_connection_server_protocol::FlushPendingTxRequests,
);
#[cfg(feature = "l2")]
if state.l2_state.connection_state().is_ok() {
send_interval(
PERIODIC_BLOCK_BROADCAST_INTERVAL,
ctx.clone(),
L2Message {
msg: L2Cast::BlockBroadcast,
},
);
send_interval(
PERIODIC_BATCH_BROADCAST_INTERVAL,
ctx.clone(),
L2Message {
msg: L2Cast::BatchBroadcast,
},
);
}
spawn_listener(
ctx.clone(),
stream.filter_map(|result| match result {
Ok(msg) => Some(peer_connection_server_protocol::IncomingMessage { message: msg }),
Err(e) => {
debug!(error=?e, "Error receiving RLPx message");
None
}
}),
);
if state.negotiated_eth_capability.is_some() {
let stream: BroadcastStream<(Id, Arc<Message>)> =
BroadcastStream::new(state.connection_broadcast_send.subscribe());
let message_stream = stream.filter_map(|result| {
result.ok().map(
|(id, msg)| peer_connection_server_protocol::BroadcastMessage { task_id: id, msg },
)
});
spawn_listener(ctx.clone(), message_stream);
}
Ok(())
}
async fn send_all_pooled_tx_hashes(
state: &mut Established,
connection: &mut PeerConnection,
) -> Result<(), PeerConnectionError> {
let txs: Vec<MempoolTransaction> = state
.blockchain
.mempool
.get_all_txs_by_sender()?
.into_values()
.flatten()
.filter(|tx| !tx.is_privileged())
.collect();
if !txs.is_empty() {
state
.tx_broadcaster
.add_txs(
txs.iter().map(|tx| tx.hash()).collect(),
state.node.node_id(),
)
.map_err(|e| PeerConnectionError::BroadcastError(e.to_string()))?;
send_tx_hashes(
txs,
state.capabilities.clone(),
connection,
state.node.node_id(),
&state.blockchain,
)
.await
.map_err(|e| PeerConnectionError::SendMessage(e.to_string()))?;
}
Ok(())
}
async fn send_block_range_update(state: &mut Established) -> Result<(), PeerConnectionError> {
if state
.negotiated_eth_capability
.as_ref()
.is_some_and(|eth| eth.version >= 69)
{
trace!(peer=%state.node, "Sending BlockRangeUpdate");
let update = BlockRangeUpdate::new(&state.storage).await?;
let lastet_block = update.latest_block;
send(state, Message::BlockRangeUpdate(update)).await?;
state.last_block_range_update_block = lastet_block - (lastet_block % 32);
}
Ok(())
}
async fn should_send_block_range_update(state: &Established) -> Result<bool, PeerConnectionError> {
let latest_block = state.storage.get_latest_block_number().await?;
if latest_block < state.last_block_range_update_block
|| latest_block - state.last_block_range_update_block >= 32
{
return Ok(true);
}
Ok(false)
}
async fn init_capabilities<S>(
state: &mut Established,
stream: &mut S,
) -> Result<(), PeerConnectionError>
where
S: Unpin + Stream<Item = Result<Message, PeerConnectionError>>,
{
if let Some(eth) = state.negotiated_eth_capability.clone() {
let status = match eth.version {
68 => Message::Status68(StatusMessage68::new(&state.storage).await?),
69 => Message::Status69(StatusMessage69::new(&state.storage).await?),
70 => Message::Status70(StatusMessage70::new(&state.storage).await?),
71 => Message::Status71(StatusMessage71::new(&state.storage).await?),
ver => {
return Err(PeerConnectionError::HandshakeError(format!(
"Invalid eth version {ver}"
)));
}
};
trace!(peer=%state.node, "Sending status");
send(state, status).await?;
let msg = match receive(stream).await {
Some(msg) => msg?,
None => return Err(PeerConnectionError::Disconnected),
};
match msg {
Message::Status68(msg_data) => {
trace!(peer=%state.node, "Received Status(68)");
backend::validate_status(msg_data, &state.storage, ð).await?
}
Message::Status69(msg_data) => {
trace!(peer=%state.node, "Received Status(69)");
backend::validate_status(msg_data, &state.storage, ð).await?
}
Message::Status70(msg_data) => {
trace!(peer=%state.node, "Received Status(70)");
backend::validate_status(msg_data, &state.storage, ð).await?
}
Message::Status71(msg_data) => {
trace!(peer=%state.node, "Received Status(71)");
backend::validate_status(msg_data, &state.storage, ð).await?
}
Message::Disconnect(disconnect) => {
return Err(PeerConnectionError::HandshakeError(format!(
"Peer disconnected due to: {}",
disconnect.reason()
)));
}
_ => {
return Err(PeerConnectionError::HandshakeError(
"Expected a Status message".to_string(),
));
}
}
}
Ok(())
}
async fn send_disconnect_message(state: &mut Established, reason: Option<DisconnectReason>) {
send(state, Message::Disconnect(DisconnectMessage { reason }))
.await
.unwrap_or_else(|_| {
debug!(
peer=%state.node,
?reason,
"Could not send Disconnect message",
);
});
}
async fn connection_failed(state: &mut Established, error_text: &str, error: &PeerConnectionError) {
debug!(
peer=%state.node,
%error_text,
%error,
"connection failure"
);
if !matches!(error, PeerConnectionError::DisconnectReceived(_)) {
send_disconnect_message(state, match_disconnect_reason(error)).await;
}
match error {
PeerConnectionError::DisconnectReceived(DisconnectReason::AlreadyConnected)
| PeerConnectionError::DisconnectSent(DisconnectReason::AlreadyConnected) => {
debug!(
peer=%state.node,
%error_text,
%error,
"Peer already connected, don't replace it"
);
}
_ => {
debug!(
peer=%state.node,
%error_text,
%error,
remote_public_key=%state.node.public_key,
"discarding peer",
);
}
}
}
fn match_disconnect_reason(error: &PeerConnectionError) -> Option<DisconnectReason> {
match error {
PeerConnectionError::DisconnectSent(reason) => Some(*reason),
PeerConnectionError::DisconnectReceived(reason) => Some(*reason),
PeerConnectionError::RLPDecodeError(_) => Some(DisconnectReason::NetworkError),
PeerConnectionError::TooManyPeers => Some(DisconnectReason::TooManyPeers),
_ => None,
}
}
async fn exchange_hello_messages<S>(
state: &mut Established,
stream: &mut S,
) -> Result<(), PeerConnectionError>
where
S: Unpin + Stream<Item = Result<Message, PeerConnectionError>>,
{
#[allow(unused_mut)]
let mut supported_capabilities: Vec<Capability> = [
&SUPPORTED_ETH_CAPABILITIES[..],
&SUPPORTED_SNAP_CAPABILITIES[..],
]
.concat();
#[cfg(feature = "l2")]
if state.l2_state.is_supported() {
supported_capabilities.push(crate::rlpx::l2::SUPPORTED_BASED_CAPABILITIES[0].clone());
}
let hello_msg = Message::Hello(p2p::HelloMessage::new(
supported_capabilities,
PublicKey::from_secret_key(secp256k1::SECP256K1, &state.signer),
state.client_version.clone(),
));
send(state, hello_msg).await?;
let msg = match receive(stream).await {
Some(msg) => msg?,
None => return Err(PeerConnectionError::Disconnected),
};
match msg {
Message::Hello(hello_message) => {
let mut negotiated_eth_version = 0;
let mut negotiated_snap_version = 0;
trace!(
peer=%state.node,
capabilities=?hello_message.capabilities,
"Hello message capabilities",
);
for cap in &hello_message.capabilities {
match cap.protocol() {
"eth" => {
if SUPPORTED_ETH_CAPABILITIES.contains(cap)
&& cap.version > negotiated_eth_version
{
negotiated_eth_version = cap.version;
}
}
"snap" => {
if SUPPORTED_SNAP_CAPABILITIES.contains(cap)
&& cap.version > negotiated_snap_version
{
negotiated_snap_version = cap.version;
}
}
#[cfg(feature = "l2")]
"based" if state.l2_state.is_supported() => {
state.l2_state.set_established()?;
}
_ => {}
}
}
state.capabilities = hello_message.capabilities;
if negotiated_eth_version == 0 {
return Err(PeerConnectionError::NoMatchingCapabilities);
}
debug!("Negotiated eth version: eth/{}", negotiated_eth_version);
state.negotiated_eth_capability = Some(Capability::eth(negotiated_eth_version));
if negotiated_snap_version != 0 {
debug!("Negotiated snap version: snap/{}", negotiated_snap_version);
state.negotiated_snap_capability = Some(Capability::snap(negotiated_snap_version));
}
state.node.version = Some(hello_message.client_id);
Ok(())
}
Message::Disconnect(disconnect) => {
Err(PeerConnectionError::DisconnectReceived(disconnect.reason()))
}
_ => {
Err(PeerConnectionError::BadRequest(
"Expected Hello message".to_string(),
))
}
}
}
pub(crate) async fn send(
state: &mut Established,
message: Message,
) -> Result<(), PeerConnectionError> {
#[cfg(feature = "metrics")]
{
use ethrex_metrics::p2p::METRICS_P2P;
METRICS_P2P.inc_outgoing_message(message.metric_label());
}
state.sink.send(message).await
}
async fn receive<S>(stream: &mut S) -> Option<Result<Message, PeerConnectionError>>
where
S: Unpin + Stream<Item = Result<Message, PeerConnectionError>>,
{
stream.next().await
}
fn check_serve_request_rate(state: &mut Established) -> bool {
let now = Instant::now();
if now.duration_since(state.serve_request_window_start) >= SERVE_REQUEST_WINDOW {
state.serve_request_window_start = now;
state.serve_requests_in_window = 0;
}
state.serve_requests_in_window += 1;
state.serve_requests_in_window <= MAX_SERVE_REQUESTS_PER_WINDOW
}
async fn handle_incoming_message(
state: &mut Established,
message: Message,
) -> Result<(), PeerConnectionError> {
#[cfg(feature = "metrics")]
{
use ethrex_metrics::p2p::METRICS_P2P;
METRICS_P2P.inc_incoming_message(message.metric_label());
}
let is_data_request = matches!(
message,
Message::GetBlockHeaders(_)
| Message::GetBlockBodies(_)
| Message::GetReceipts68(_)
| Message::GetReceipts69(_)
| Message::GetReceipts70(_)
| Message::GetPooledTransactions(_)
| Message::GetAccountRange(_)
| Message::GetStorageRanges(_)
| Message::GetByteCodes(_)
| Message::GetTrieNodes(_)
);
if is_data_request && !check_serve_request_rate(state) {
debug!(
peer = %state.node,
window_requests = state.serve_requests_in_window,
"Disconnecting peer: exceeded incoming request rate limit",
);
send_disconnect_message(state, Some(DisconnectReason::UselessPeer)).await;
return Err(PeerConnectionError::DisconnectSent(
DisconnectReason::UselessPeer,
));
}
let peer_supports_eth = state.negotiated_eth_capability.is_some();
#[cfg(feature = "l2")]
let peer_supports_l2 = state.l2_state.connection_state().is_ok();
match message {
Message::Disconnect(msg_data) => {
let reason = msg_data.reason();
trace!(
peer=%state.node,
?reason,
"Received Disconnect"
);
state.disconnect_reason = Some(reason);
return Err(PeerConnectionError::DisconnectReceived(reason));
}
Message::Ping(_) => {
trace!(peer=%state.node, "Sending pong message");
send(state, Message::Pong(PongMessage {})).await?;
}
Message::Pong(_) => {
}
Message::Status68(msg_data) => {
if let Some(eth) = &state.negotiated_eth_capability {
backend::validate_status(msg_data, &state.storage, eth).await?
};
}
Message::Status69(msg_data) => {
if let Some(eth) = &state.negotiated_eth_capability {
backend::validate_status(msg_data, &state.storage, eth).await?
};
}
Message::Status70(msg_data) => {
if let Some(eth) = &state.negotiated_eth_capability {
backend::validate_status(msg_data, &state.storage, eth).await?
};
}
Message::Status71(msg_data) => {
if let Some(eth) = &state.negotiated_eth_capability {
backend::validate_status(msg_data, &state.storage, eth).await?
};
}
Message::GetAccountRange(req) => {
let response = process_account_range_request(req, state.storage.clone()).await?;
send(state, Message::AccountRange(response)).await?
}
Message::Transactions(txs) if peer_supports_eth => {
if !txs.transactions.is_empty() {
state.received_txs_from_peer = true;
}
if state.blockchain.is_synced() {
let tx_hashes: Vec<_> = txs.transactions.iter().map(|tx| tx.hash()).collect();
let blockchain = state.blockchain.clone();
let peer = state.node.to_string();
#[cfg(feature = "l2")]
let is_l2_mode = state.l2_state.is_supported();
tokio::spawn(async move {
for tx in txs.transactions {
#[cfg(feature = "l2")]
if (is_l2_mode && matches!(tx, Transaction::EIP4844Transaction(_)))
|| tx.is_privileged()
{
let tx_type = tx.tx_type();
debug!(peer=%peer, "Rejecting transaction in L2 mode - {tx_type} transactions are not broadcasted in L2");
continue;
}
if let Err(e) = blockchain.add_transaction_to_pool(tx).await {
debug!(
peer=%peer,
error=%e,
"Error adding transaction"
);
}
}
});
state
.tx_broadcaster
.add_txs(tx_hashes, state.node.node_id())
.map_err(|e| PeerConnectionError::BroadcastError(e.to_string()))?;
}
}
Message::GetBlockHeaders(msg_data) if peer_supports_eth => {
let response = BlockHeaders {
id: msg_data.id,
block_headers: msg_data.fetch_headers(&state.storage).await,
};
send(state, Message::BlockHeaders(response)).await?;
}
Message::GetBlockBodies(msg_data) if peer_supports_eth => {
let response = BlockBodies {
id: msg_data.id,
block_bodies: msg_data.fetch_blocks(&state.storage).await,
};
send(state, Message::BlockBodies(response)).await?;
}
Message::GetBlockAccessLists(GetBlockAccessLists { id, block_hashes })
if peer_supports_eth =>
{
use crate::rlpx::eth::block_access_lists::BLOCK_ACCESS_LIST_LIMIT;
let mut block_access_lists =
Vec::with_capacity(block_hashes.len().min(BLOCK_ACCESS_LIST_LIMIT));
for hash in &block_hashes {
let bal = match state.storage.get_block_access_list(*hash) {
Ok(Some(bal)) => {
let commitment = match state.storage.get_block_header_by_hash(*hash) {
Ok(Some(header)) => header.block_access_list_hash,
Ok(None) => None,
Err(err) => {
warn!(
"Failed to read header for BAL commitment check (hash {hash:#x}): {err}; reporting BAL unavailable"
);
None
}
};
bal.matches_commitment(commitment).then_some(bal)
}
Ok(None) => None,
Err(err) => {
error!("Error accessing DB while building BAL response for peer: {err}");
None
}
};
block_access_lists.push(bal);
if block_access_lists.len() >= BLOCK_ACCESS_LIST_LIMIT {
break;
}
}
let response = BlockAccessLists::new(id, block_access_lists);
send(state, Message::BlockAccessLists(response)).await?;
}
Message::GetReceipts68(GetReceipts68 { id, block_hashes }) if peer_supports_eth => {
let mut receipts = Vec::new();
for hash in block_hashes.iter() {
receipts.push(state.storage.get_receipts_for_block(hash).await?);
}
send(state, Message::Receipts68(Receipts68::new(id, receipts))).await?;
}
Message::GetReceipts69(GetReceipts68 { id, block_hashes }) if peer_supports_eth => {
let mut receipts = Vec::new();
for hash in block_hashes.iter() {
receipts.push(state.storage.get_receipts_for_block(hash).await?);
}
send(state, Message::Receipts69(Receipts69::new(id, receipts))).await?;
}
Message::GetReceipts70(GetReceipts70 {
id,
first_block_receipt_index,
block_hashes,
}) if peer_supports_eth => {
let block_hashes = &block_hashes[..block_hashes.len().min(256)];
let mut all_receipts: Vec<Vec<Receipt>> = Vec::new();
let mut total_size: usize = 0;
let mut last_block_incomplete = false;
for (i, hash) in block_hashes.iter().enumerate() {
let start_index = if i == 0 { first_block_receipt_index } else { 0 };
let block_receipts = state
.storage
.get_receipts_for_block_from_index(hash, start_index, None)
.await?;
let mut block_receipt_list = Vec::new();
let mut hit_limit = false;
for receipt in block_receipts {
let receipt_size = receipt.length();
if total_size + receipt_size > SOFT_RESPONSE_LIMIT
&& (!block_receipt_list.is_empty() || !all_receipts.is_empty())
{
hit_limit = true;
if !block_receipt_list.is_empty() {
last_block_incomplete = true;
}
break;
}
total_size += receipt_size;
block_receipt_list.push(receipt);
}
if !block_receipt_list.is_empty() || !hit_limit {
all_receipts.push(block_receipt_list);
}
if hit_limit {
break;
}
}
let response =
Message::Receipts70(Receipts70::new(id, last_block_incomplete, all_receipts));
send(state, response).await?;
}
Message::BlockRangeUpdate(update) => {
trace!(
peer=%state.node,
range_from=update.earliest_block,
range_to=update.latest_block,
"Block range update",
);
if let Err(err) = update.validate() {
debug!(
peer=%state.node,
reason=%err,
"Disconnecting peer: invalid block range update",
);
send_disconnect_message(state, Some(DisconnectReason::SubprotocolError)).await;
return Err(PeerConnectionError::DisconnectSent(
DisconnectReason::SubprotocolError,
));
}
}
Message::NewPooledTransactionHashes(new_pooled_transaction_hashes) if peer_supports_eth => {
if state.blockchain.is_synced() {
let hashes = new_pooled_transaction_hashes
.get_transactions_to_request(&state.blockchain, state.node.node_id())?;
if !hashes.is_empty() {
state
.pending_tx_requests
.push((new_pooled_transaction_hashes, hashes));
}
}
}
Message::GetPooledTransactions(msg) => {
let response = msg.handle(&state.blockchain)?;
let batch_size = response.pooled_transactions.len() as u64;
if state.txs_sent_to_peer + batch_size > LEECH_TX_SENT_THRESHOLD
&& !state.received_txs_from_peer
{
debug!(
peer = %state.node,
txs_sent = state.txs_sent_to_peer,
"Disconnecting peer: leech detected (sent many txs but received none)",
);
send_disconnect_message(state, Some(DisconnectReason::UselessPeer)).await;
return Err(PeerConnectionError::DisconnectSent(
DisconnectReason::UselessPeer,
));
}
send(state, Message::PooledTransactions(response)).await?;
state.txs_sent_to_peer += batch_size;
}
Message::PooledTransactions(msg) if peer_supports_eth => {
if !msg.pooled_transactions.is_empty() {
state.received_txs_from_peer = true;
}
let removed_request = state.requested_pooled_txs.remove(&msg.id);
if let Some((_, ref requested_hashes, _)) = removed_request {
state
.blockchain
.mempool
.clear_in_flight_txs(requested_hashes)?;
}
for tx in &msg.pooled_transactions {
if let P2PTransaction::EIP4844TransactionWithBlobs(itx) = tx
&& (itx.blobs_bundle.is_empty()
|| itx
.blobs_bundle
.validate_blob_commitment_hashes(&itx.tx.blob_versioned_hashes)
.is_err())
{
debug!(
peer=%state.node,
"Disconnecting peer: invalid or missing blobs",
);
if let Some((_announced, requested_hashes, _)) = &removed_request {
retry_on_alternates(&state.blockchain, &state.peer_table, requested_hashes)
.await;
}
send_disconnect_message(state, Some(DisconnectReason::SubprotocolError)).await;
return Err(PeerConnectionError::DisconnectSent(
DisconnectReason::SubprotocolError,
));
}
}
if state.blockchain.is_synced() {
if let Some((announced, requested_hashes, _)) = &removed_request {
let fork = state.blockchain.current_fork().await?;
if let Err(error) = msg.validate_requested(announced, fork) {
debug!(
peer=%state.node,
reason=%error,
"Disconnecting peer: invalid pooled transactions response",
);
retry_on_alternates(&state.blockchain, &state.peer_table, requested_hashes)
.await;
send_disconnect_message(state, Some(DisconnectReason::SubprotocolError))
.await;
return Err(PeerConnectionError::DisconnectSent(
DisconnectReason::SubprotocolError,
));
}
}
#[cfg(feature = "l2")]
let is_l2_mode = state.l2_state.is_supported();
#[cfg(not(feature = "l2"))]
let is_l2_mode = false;
if let Err(error) = msg.handle(&state.node, &state.blockchain, is_l2_mode).await {
if matches!(
error,
ethrex_blockchain::error::MempoolError::BlobsBundleError(_)
) {
debug!(
peer=%state.node,
reason=%error,
"Disconnecting peer: invalid pooled transactions response",
);
if let Some((_announced, requested_hashes, _)) = &removed_request {
retry_on_alternates(
&state.blockchain,
&state.peer_table,
requested_hashes,
)
.await;
}
send_disconnect_message(state, Some(DisconnectReason::SubprotocolError))
.await;
return Err(PeerConnectionError::DisconnectSent(
DisconnectReason::SubprotocolError,
));
}
return Err(error.into());
}
}
}
Message::GetStorageRanges(req) => {
let response = process_storage_ranges_request(req, state.storage.clone()).await?;
send(state, Message::StorageRanges(response)).await?
}
Message::GetByteCodes(req) => {
let storage_clone = state.storage.clone();
let response = process_byte_codes_request(req, storage_clone)
.await
.map_err(|_| {
PeerConnectionError::InternalError(
"Failed to execute bytecode retrieval task".to_string(),
)
})?;
send(state, Message::ByteCodes(response)).await?
}
Message::GetTrieNodes(req) => {
let id = req.id;
match process_trie_nodes_request(req, state.storage.clone()).await {
Ok(response) => send(state, Message::TrieNodes(response)).await?,
Err(_) => send(state, Message::TrieNodes(TrieNodes { id, nodes: vec![] })).await?,
}
}
#[cfg(feature = "l2")]
Message::L2(req) if peer_supports_l2 => {
handle_based_capability_message(state, req).await?;
}
message @ Message::AccountRange(_)
| message @ Message::StorageRanges(_)
| message @ Message::ByteCodes(_)
| message @ Message::TrieNodes(_)
| message @ Message::BlockBodies(_)
| message @ Message::BlockHeaders(_)
| message @ Message::Receipts68(_)
| message @ Message::Receipts69(_)
| message @ Message::Receipts70(_)
| message @ Message::BlockAccessLists(_) => {
if let Some((_, tx)) = message
.request_id()
.and_then(|id| state.current_requests.remove(&id))
{
tx.send(message)
.map_err(|e| PeerConnectionError::SendMessage(e.to_string()))?
} else {
return Err(PeerConnectionError::ExpectedRequestId(format!("{message}")));
}
}
message => return Err(PeerConnectionError::MessageNotHandled(format!("{message}"))),
};
Ok(())
}
async fn handle_outgoing_message(
state: &mut Established,
message: Message,
) -> Result<(), PeerConnectionError> {
trace!(
peer=%state.node,
%message,
"Sending message"
);
send(state, message).await?;
Ok(())
}
async fn handle_outgoing_request(
state: &mut Established,
message: Message,
sender: oneshot::Sender<Message>,
) -> Result<(), PeerConnectionError> {
message.request_id().and_then(|id| {
state
.current_requests
.insert(id, (format!("{message}"), sender))
});
trace!(
peer=%state.node,
%message,
"Sending request"
);
send(state, message).await?;
Ok(())
}
async fn handle_broadcast(
state: &mut Established,
(id, broadcasted_msg): (task::Id, Arc<Message>),
) -> Result<(), PeerConnectionError> {
if id != tokio::task::id() {
match broadcasted_msg.as_ref() {
#[cfg(feature = "l2")]
l2_msg @ Message::L2(_) => {
handle_l2_broadcast(state, l2_msg).await?;
}
msg => {
error!(
peer=%state.node,
message=%msg,
"Non-supported message broadcasted"
);
let error_message = format!("Non-supported message broadcasted: {msg}");
return Err(PeerConnectionError::BroadcastError(error_message));
}
}
}
Ok(())
}
async fn handle_block_range_update(state: &mut Established) -> Result<(), PeerConnectionError> {
if should_send_block_range_update(state).await? {
send_block_range_update(state).await
} else {
Ok(())
}
}
async fn flush_pending_tx_requests(state: &mut Established) -> Result<(), PeerConnectionError> {
if state.pending_tx_requests.is_empty() {
return Ok(());
}
let pending = std::mem::take(&mut state.pending_tx_requests);
let mut all_hashes: Vec<H256> = Vec::new();
let mut all_types: Vec<u8> = Vec::new();
let mut all_sizes: Vec<usize> = Vec::new();
for (announcement, hashes) in &pending {
let trimmed = announcement.filter_to(hashes);
all_hashes.extend_from_slice(&trimmed.transaction_hashes);
all_types.extend_from_slice(&trimmed.transaction_types);
all_sizes.extend(trimmed.transaction_sizes);
}
const MAX_HASHES_PER_REQUEST: usize = 256;
for (i, chunk) in all_hashes.chunks(MAX_HASHES_PER_REQUEST).enumerate() {
let offset = i * MAX_HASHES_PER_REQUEST;
let chunk_types = &all_types[offset..offset + chunk.len()];
let chunk_sizes = &all_sizes[offset..offset + chunk.len()];
let announcement = NewPooledTransactionHashes::from_raw(
chunk_types.to_vec().into(),
chunk_sizes.to_vec(),
chunk.to_vec(),
);
let request = GetPooledTransactions::new(random(), chunk.to_vec());
let request_id = request.id;
if let Err(e) = send(state, Message::GetPooledTransactions(request)).await {
let unsent = &all_hashes[offset..];
if !unsent.is_empty() {
if let Err(clear_err) = state.blockchain.mempool.clear_in_flight_txs(unsent) {
warn!(error = %clear_err, "Failed to clear in-flight transaction tracking after send error");
}
retry_on_alternates(&state.blockchain, &state.peer_table, unsent).await;
}
return Err(e);
}
state
.requested_pooled_txs
.insert(request_id, (announcement, chunk.to_vec(), Instant::now()));
}
Ok(())
}
async fn retry_on_alternates(
blockchain: &Arc<Blockchain>,
peer_table: &PeerTable,
hashes: &[H256],
) {
if hashes.is_empty() {
return;
}
type AltGroup = (PeerConnection, Vec<(H256, u8, usize)>);
let mut by_peer: FxHashMap<H256, AltGroup> = FxHashMap::default();
for hash in hashes {
loop {
let alt = match blockchain.mempool.pop_alternate(*hash) {
Ok(Some(a)) => a,
Ok(None) => break,
Err(e) => {
warn!(error = %e, "pop_alternate failed");
break;
}
};
if let Some((_, list)) = by_peer.get_mut(&alt.peer_id) {
list.push((*hash, alt.tx_type, alt.tx_size));
break;
}
match peer_table.get_peer_connection(alt.peer_id).await {
Ok(Some(conn)) => {
by_peer.insert(alt.peer_id, (conn, vec![(*hash, alt.tx_type, alt.tx_size)]));
break;
}
Ok(None) => continue, Err(e) => {
warn!(error = %e, "get_peer_connection failed");
break;
}
}
}
}
for (_, (conn, entries)) in by_peer {
let mut types = Vec::with_capacity(entries.len());
let mut sizes = Vec::with_capacity(entries.len());
let mut hash_list = Vec::with_capacity(entries.len());
for (h, t, s) in &entries {
hash_list.push(*h);
types.push(*t);
sizes.push(*s);
}
let announcement =
NewPooledTransactionHashes::from_raw(types.into(), sizes, hash_list.clone());
if let Err(e) = conn.enqueue_tx_requests(announcement, hash_list) {
debug!(error = %e, "Failed to enqueue tx requests on alternate peer");
}
}
}