discv5 0.1.0

Implementation of the p2p discv5 discovery protocol
Documentation
//! This is a standalone task that handles UDP packets as they are received.
//!
//! Every UDP packet passes a filter before being processed.

use super::filter::{Filter, FilterConfig};
use crate::{
    ipmode::to_ipv4_mapped, metrics::METRICS, node_info::NodeAddress, packet::*, Executor,
};
use parking_lot::RwLock;
use std::{
    collections::HashMap,
    net::{IpAddr, SocketAddr},
    sync::Arc,
    time::Duration,
};
use tokio::{
    net::UdpSocket,
    sync::{mpsc, oneshot},
};

use tracing::{debug, trace, warn};

/// The object sent back by the Recv handler.
pub struct InboundPacket {
    /// The originating socket addr.
    pub src_address: SocketAddr,
    /// The packet header.
    pub header: PacketHeader,
    /// The message of the packet.
    pub message: Vec<u8>,
    /// The authenticated data of the packet.
    pub authenticated_data: Vec<u8>,
}

/// Convenience objects for setting up the recv handler.
pub struct RecvHandlerConfig {
    pub filter_config: FilterConfig,
    /// If the filter is enabled this sets the default timeout for bans enacted by the filter.
    pub ban_duration: Option<Duration>,
    pub executor: Box<dyn Executor>,
    pub recv: Arc<UdpSocket>,
    pub local_node_id: enr::NodeId,
    pub expected_responses: Arc<RwLock<HashMap<SocketAddr, usize>>>,
}

/// The main task that handles inbound UDP packets.
pub(crate) struct RecvHandler {
    /// The UDP recv socket.
    recv: Arc<UdpSocket>,
    /// The list of waiting responses. These are used to allow incoming packets from sources
    /// that we are expected a response from bypassing the rate-limit filters.
    expected_responses: Arc<RwLock<HashMap<SocketAddr, usize>>>,
    /// The packet filter which decides whether to accept or reject inbound packets.
    filter: Filter,
    /// The buffer to accept inbound datagrams.
    recv_buffer: [u8; MAX_PACKET_SIZE],
    /// The local node id used to decrypt headers of messages.
    node_id: enr::NodeId,
    /// The channel to send the packet handler.
    handler: mpsc::Sender<InboundPacket>,
    /// Exit channel to shutdown the recv handler.
    exit: oneshot::Receiver<()>,
}

impl RecvHandler {
    /// Spawns the `RecvHandler` on a provided executor.
    pub(crate) fn spawn(
        config: RecvHandlerConfig,
    ) -> (mpsc::Receiver<InboundPacket>, oneshot::Sender<()>) {
        let (exit_sender, exit) = oneshot::channel();

        let filter_enabled = config.filter_config.enabled;

        // create the channel to send decoded packets to the handler
        let (handler, handler_recv) = mpsc::channel(30);

        let mut recv_handler = RecvHandler {
            recv: config.recv,
            filter: Filter::new(config.filter_config, config.ban_duration),
            recv_buffer: [0; MAX_PACKET_SIZE],
            node_id: config.local_node_id,
            expected_responses: config.expected_responses,
            handler,
            exit,
        };

        // start the handler
        config.executor.spawn(Box::pin(async move {
            debug!("Recv handler starting");
            recv_handler.start(filter_enabled).await;
        }));
        (handler_recv, exit_sender)
    }

    /// The main future driving the recv handler. This will shutdown when the exit future is fired.
    async fn start(&mut self, filter_enabled: bool) {
        // Interval to prune to rate limiter.
        let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(30));

        loop {
            tokio::select! {
                Ok((length, src)) = self.recv.recv_from(&mut self.recv_buffer) => {
                    METRICS.add_recv_bytes(length);
                    self.handle_inbound(src, length).await;
                }
                _ = interval.tick(), if filter_enabled => {
                    self.filter.prune_limiter();
                },
                _ = &mut self.exit => {
                    debug!("Recv handler shutdown");
                    return;
                }
            }
        }
    }

    /// Handles in incoming packet. Passes through the filter, decodes and sends to the packet
    /// handler.
    async fn handle_inbound(&mut self, mut src_address: SocketAddr, length: usize) {
        // Make sure ip4 addresses in dual stack nodes are reported correctly
        if let IpAddr::V6(ip6) = src_address.ip() {
            // NOTE: here we don't want to use the `to_ipv4` method, since it also includes compat
            // addresses, which are deprecated. Dual stack nodes will report ipv4 addresses as
            // mapped addresses and not compat, so this is not needed. This also messes with some
            // valid ipv6 local addresses (for example, the ipv6 loopback address). Ideally what we
            // want is `to_canonical`.
            if let Some(ip4) = to_ipv4_mapped(&ip6) {
                trace!("Mapping inbound packet addr from {} to {}", ip6, ip4);
                src_address.set_ip(ip4.into())
            }
        }
        // Permit all expected responses
        let permitted = self.expected_responses.read().get(&src_address).is_some();

        // Perform the first run of the filter. This checks for rate limits and black listed IP
        // addresses.
        if !permitted && !self.filter.initial_pass(&src_address) {
            trace!("Packet filtered from source: {:?}", src_address);
            return;
        }
        // Decodes the packet
        let (packet, authenticated_data) =
            match Packet::decode(&self.node_id, &self.recv_buffer[..length]) {
                Ok(p) => p,
                Err(e) => {
                    debug!("Packet decoding failed: {:?}", e); // could not decode the packet, drop it
                    return;
                }
            };

        // If this is not a challenge packet, we immediately know its src_id and so pass it
        // through the second filter.
        if let Some(node_id) = packet.src_id() {
            // Construct the node address
            let node_address = NodeAddress {
                socket_addr: src_address,
                node_id,
            };

            // Perform packet-level filtering
            if !permitted && !self.filter.final_pass(&node_address, &packet) {
                return;
            }
        }

        let inbound = InboundPacket {
            src_address,
            header: packet.header,
            message: packet.message,
            authenticated_data,
        };

        // send the filtered decoded packet to the handler.
        self.handler
            .send(inbound)
            .await
            .unwrap_or_else(|e| warn!("Could not send packet to handler: {}", e));
    }
}