1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221
//! This is a standalone task that handles UDP packets as they are received.
//!
//! Every UDP packet passes a filter before being processed.
use super::filter::{Filter, FilterConfig};
use crate::{metrics::METRICS, node_info::NodeAddress, packet::*, Executor};
use parking_lot::RwLock;
use std::{collections::HashMap, net::SocketAddr, sync::Arc, time::Duration};
use tokio::{
net::UdpSocket,
sync::{mpsc, oneshot},
};
use tracing::{debug, trace, warn};
/// The object sent back by the Recv handler.
pub struct InboundPacket {
/// The originating socket addr.
pub src_address: SocketAddr,
/// The packet header.
pub header: PacketHeader,
/// The message of the packet.
pub message: Vec<u8>,
/// The authenticated data of the packet.
pub authenticated_data: Vec<u8>,
}
/// Convenience objects for setting up the recv handler.
pub struct RecvHandlerConfig {
pub filter_config: FilterConfig,
/// If the filter is enabled this sets the default timeout for bans enacted by the filter.
pub ban_duration: Option<Duration>,
pub executor: Box<dyn Executor>,
pub recv: Arc<UdpSocket>,
pub second_recv: Option<Arc<UdpSocket>>,
pub local_node_id: enr::NodeId,
pub expected_responses: Arc<RwLock<HashMap<SocketAddr, usize>>>,
}
/// The main task that handles inbound UDP packets.
pub(crate) struct RecvHandler {
/// The UDP recv socket.
recv: Arc<UdpSocket>,
/// An option second UDP socket. Used when dialing over both Ipv4 and Ipv6.
second_recv: Option<Arc<UdpSocket>>,
/// Simple hack to alternate reading from the first or the second socket.
/// The list of waiting responses. These are used to allow incoming packets from sources
/// that we are expected a response from bypassing the rate-limit filters.
expected_responses: Arc<RwLock<HashMap<SocketAddr, usize>>>,
/// The packet filter which decides whether to accept or reject inbound packets.
filter: Filter,
/// The local node id used to decrypt headers of messages.
node_id: enr::NodeId,
/// The channel to send the packet handler.
handler: mpsc::Sender<InboundPacket>,
/// Exit channel to shutdown the recv handler.
exit: oneshot::Receiver<()>,
}
impl RecvHandler {
/// Spawns the `RecvHandler` on a provided executor.
pub(crate) fn spawn<P: ProtocolIdentity>(
config: RecvHandlerConfig,
) -> (mpsc::Receiver<InboundPacket>, oneshot::Sender<()>) {
let (exit_sender, exit) = oneshot::channel();
let RecvHandlerConfig {
filter_config,
ban_duration,
executor,
recv,
second_recv,
local_node_id,
expected_responses,
} = config;
let filter_enabled = filter_config.enabled;
// create the channel to send decoded packets to the handler
let (handler, handler_recv) = mpsc::channel(30);
let mut recv_handler = RecvHandler {
recv,
second_recv,
expected_responses,
filter: Filter::new(filter_config, ban_duration),
node_id: local_node_id,
handler,
exit,
};
// start the handler
executor.spawn(Box::pin(async move {
debug!("Recv handler starting");
recv_handler.start::<P>(filter_enabled).await;
}));
(handler_recv, exit_sender)
}
/// The main future driving the recv handler. This will shutdown when the exit future is fired.
async fn start<P: ProtocolIdentity>(&mut self, filter_enabled: bool) {
// Interval to prune to rate limiter.
let mut interval = tokio::time::interval(Duration::from_secs(30));
let mut first_buffer = [0; MAX_PACKET_SIZE];
let mut second_buffer = [0; MAX_PACKET_SIZE];
use futures::future::OptionFuture;
// We want to completely deactivate this branch of the select when there is no second
// socket to receive from.
let check_second_recv = self.second_recv.is_some();
loop {
tokio::select! {
Ok((length, src)) = self.recv.recv_from(&mut first_buffer) => {
METRICS.add_recv_bytes(length);
self.handle_inbound::<P>(src, length, &first_buffer).await;
}
Some(Ok((length, src))) = Into::<OptionFuture<_>>::into(self.second_recv.as_ref().map(|second_recv|second_recv.recv_from(&mut second_buffer))), if check_second_recv => {
METRICS.add_recv_bytes(length);
self.handle_inbound::<P>(src, length, &second_buffer).await;
}
_ = interval.tick(), if filter_enabled => {
self.filter.prune_limiter();
},
_ = &mut self.exit => {
debug!("Recv handler shutdown");
return;
}
}
}
}
/// Handles in incoming packet. Passes through the filter, decodes and sends to the packet
/// handler.
async fn handle_inbound<P: ProtocolIdentity>(
&mut self,
mut src_address: SocketAddr,
length: usize,
recv_buffer: &[u8; MAX_PACKET_SIZE],
) {
// Zero out the flowinfo and scope id of v6 socket addresses.
//
// Flowinfo contains both the Flow label and Traffic Class. These should be ignored by
// nodes that do not support them when receiving packets according to
// [RFC 2460 section 6](https://datatracker.ietf.org/doc/html/rfc2460#section-6) and
// [RFC 2460 section 7](https://datatracker.ietf.org/doc/html/rfc2460#section-7)
//
// Excerpt from section 6
// > Hosts or routers that do not support the functions of the Flow Label field are
// > required to set the field to zero when originating a packet, pass the field on unchanged
// > when forwarding a packet, and ignore the field when receiving a packet.
//
// Excerpt from section 7
// > Nodes should ignore and leave unchanged any bits of the Traffic Class field for which
// > they do not support a specific use.
//
// Since we do not forward this information, it's safe to zero it out.
//
// The scope id usage is formalized in [RFC 4007](https://www.rfc-editor.org/rfc/rfc4007)
// and it's necessary to make link local ipv6 addresses routable.
//
// Since the Enr has no way to communicate scope ids, we opt for zeroing this data in order
// to facilitate connectivity for nodes with a link-local address with an only interface.
//
// At the same time, we accept the risk of colission of nodes in a topology where there are
// multiple interfaces and two nodes with the same link-local address. This risk is small
// based in additional checks to packets.
if let SocketAddr::V6(ref mut v6_socket_addr) = src_address {
if v6_socket_addr.flowinfo() != 0 || v6_socket_addr.scope_id() != 0 {
trace!("Zeroing out flowinfo and scope_id for v6 socket address. Original {v6_socket_addr}");
v6_socket_addr.set_flowinfo(0);
v6_socket_addr.set_scope_id(0);
}
}
// Permit all expected responses
let permitted = self.expected_responses.read().get(&src_address).is_some();
// Perform the first run of the filter. This checks for rate limits and black listed IP
// addresses.
if !permitted && !self.filter.initial_pass(&src_address) {
trace!("Packet filtered from source: {:?}", src_address);
return;
}
// Decodes the packet
let (packet, authenticated_data) =
match Packet::decode::<P>(&self.node_id, &recv_buffer[..length]) {
Ok(p) => p,
Err(e) => {
debug!("Packet decoding failed: {:?}", e); // could not decode the packet, drop it
return;
}
};
// If this is not a challenge packet, we immediately know its src_id and so pass it
// through the second filter.
if let Some(node_id) = packet.src_id() {
// Construct the node address
let node_address = NodeAddress {
socket_addr: src_address,
node_id,
};
// Perform packet-level filtering
if !permitted && !self.filter.final_pass(&node_address, &packet) {
return;
}
}
let inbound = InboundPacket {
src_address,
header: packet.header,
message: packet.message,
authenticated_data,
};
// send the filtered decoded packet to the handler.
self.handler
.send(inbound)
.await
.unwrap_or_else(|e| warn!("Could not send packet to handler: {}", e));
}
}