use super::filter::{Filter, FilterConfig};
use crate::{
ipmode::to_ipv4_mapped, metrics::METRICS, node_info::NodeAddress, packet::*, Executor,
};
use parking_lot::RwLock;
use std::{
collections::HashMap,
net::{IpAddr, SocketAddr},
sync::Arc,
time::Duration,
};
use tokio::{
net::UdpSocket,
sync::{mpsc, oneshot},
};
use tracing::{debug, trace, warn};
pub struct InboundPacket {
pub src_address: SocketAddr,
pub header: PacketHeader,
pub message: Vec<u8>,
pub authenticated_data: Vec<u8>,
}
pub struct RecvHandlerConfig {
pub filter_config: FilterConfig,
pub ban_duration: Option<Duration>,
pub executor: Box<dyn Executor>,
pub recv: Arc<UdpSocket>,
pub local_node_id: enr::NodeId,
pub expected_responses: Arc<RwLock<HashMap<SocketAddr, usize>>>,
}
pub(crate) struct RecvHandler {
recv: Arc<UdpSocket>,
expected_responses: Arc<RwLock<HashMap<SocketAddr, usize>>>,
filter: Filter,
recv_buffer: [u8; MAX_PACKET_SIZE],
node_id: enr::NodeId,
handler: mpsc::Sender<InboundPacket>,
exit: oneshot::Receiver<()>,
}
impl RecvHandler {
pub(crate) fn spawn(
config: RecvHandlerConfig,
) -> (mpsc::Receiver<InboundPacket>, oneshot::Sender<()>) {
let (exit_sender, exit) = oneshot::channel();
let filter_enabled = config.filter_config.enabled;
let (handler, handler_recv) = mpsc::channel(30);
let mut recv_handler = RecvHandler {
recv: config.recv,
filter: Filter::new(config.filter_config, config.ban_duration),
recv_buffer: [0; MAX_PACKET_SIZE],
node_id: config.local_node_id,
expected_responses: config.expected_responses,
handler,
exit,
};
config.executor.spawn(Box::pin(async move {
debug!("Recv handler starting");
recv_handler.start(filter_enabled).await;
}));
(handler_recv, exit_sender)
}
async fn start(&mut self, filter_enabled: bool) {
let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(30));
loop {
tokio::select! {
Ok((length, src)) = self.recv.recv_from(&mut self.recv_buffer) => {
METRICS.add_recv_bytes(length);
self.handle_inbound(src, length).await;
}
_ = interval.tick(), if filter_enabled => {
self.filter.prune_limiter();
},
_ = &mut self.exit => {
debug!("Recv handler shutdown");
return;
}
}
}
}
async fn handle_inbound(&mut self, mut src_address: SocketAddr, length: usize) {
if let IpAddr::V6(ip6) = src_address.ip() {
if let Some(ip4) = to_ipv4_mapped(&ip6) {
trace!("Mapping inbound packet addr from {} to {}", ip6, ip4);
src_address.set_ip(ip4.into())
}
}
let permitted = self.expected_responses.read().get(&src_address).is_some();
if !permitted && !self.filter.initial_pass(&src_address) {
trace!("Packet filtered from source: {:?}", src_address);
return;
}
let (packet, authenticated_data) =
match Packet::decode(&self.node_id, &self.recv_buffer[..length]) {
Ok(p) => p,
Err(e) => {
debug!("Packet decoding failed: {:?}", e); return;
}
};
if let Some(node_id) = packet.src_id() {
let node_address = NodeAddress {
socket_addr: src_address,
node_id,
};
if !permitted && !self.filter.final_pass(&node_address, &packet) {
return;
}
}
let inbound = InboundPacket {
src_address,
header: packet.header,
message: packet.message,
authenticated_data,
};
self.handler
.send(inbound)
.await
.unwrap_or_else(|e| warn!("Could not send packet to handler: {}", e));
}
}