1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
//! This is a standalone task that handles UDP packets as they are received.
//!
//! Every UDP packet passes a filter before being processed.

use super::filter::{Filter, FilterConfig};
use crate::{metrics::METRICS, node_info::NodeAddress, packet::*, Executor};
use parking_lot::RwLock;
use std::{collections::HashMap, net::SocketAddr, sync::Arc, time::Duration};
use tokio::{
    net::UdpSocket,
    sync::{mpsc, oneshot},
};

use tracing::{debug, trace, warn};

/// The object sent back by the Recv handler.
pub struct InboundPacket {
    /// The originating socket addr.
    pub src_address: SocketAddr,
    /// The packet header.
    pub header: PacketHeader,
    /// The message of the packet.
    pub message: Vec<u8>,
    /// The authenticated data of the packet.
    pub authenticated_data: Vec<u8>,
}

/// Convenience objects for setting up the recv handler.
pub struct RecvHandlerConfig {
    pub filter_config: FilterConfig,
    /// If the filter is enabled this sets the default timeout for bans enacted by the filter.
    pub ban_duration: Option<Duration>,
    pub executor: Box<dyn Executor>,
    pub recv: Arc<UdpSocket>,
    pub second_recv: Option<Arc<UdpSocket>>,
    pub local_node_id: enr::NodeId,
    pub expected_responses: Arc<RwLock<HashMap<SocketAddr, usize>>>,
}

/// The main task that handles inbound UDP packets.
pub(crate) struct RecvHandler {
    /// The UDP recv socket.
    recv: Arc<UdpSocket>,
    /// An option second UDP socket. Used when dialing over both Ipv4 and Ipv6.
    second_recv: Option<Arc<UdpSocket>>,
    /// Simple hack to alternate reading from the first or the second socket.
    /// The list of waiting responses. These are used to allow incoming packets from sources
    /// that we are expected a response from bypassing the rate-limit filters.
    expected_responses: Arc<RwLock<HashMap<SocketAddr, usize>>>,
    /// The packet filter which decides whether to accept or reject inbound packets.
    filter: Filter,
    /// The local node id used to decrypt headers of messages.
    node_id: enr::NodeId,
    /// The channel to send the packet handler.
    handler: mpsc::Sender<InboundPacket>,
    /// Exit channel to shutdown the recv handler.
    exit: oneshot::Receiver<()>,
}

impl RecvHandler {
    /// Spawns the `RecvHandler` on a provided executor.
    pub(crate) fn spawn<P: ProtocolIdentity>(
        config: RecvHandlerConfig,
    ) -> (mpsc::Receiver<InboundPacket>, oneshot::Sender<()>) {
        let (exit_sender, exit) = oneshot::channel();
        let RecvHandlerConfig {
            filter_config,
            ban_duration,
            executor,
            recv,
            second_recv,
            local_node_id,
            expected_responses,
        } = config;

        let filter_enabled = filter_config.enabled;

        // create the channel to send decoded packets to the handler
        let (handler, handler_recv) = mpsc::channel(30);

        let mut recv_handler = RecvHandler {
            recv,
            second_recv,
            expected_responses,
            filter: Filter::new(filter_config, ban_duration),
            node_id: local_node_id,
            handler,
            exit,
        };

        // start the handler
        executor.spawn(Box::pin(async move {
            debug!("Recv handler starting");
            recv_handler.start::<P>(filter_enabled).await;
        }));
        (handler_recv, exit_sender)
    }

    /// The main future driving the recv handler. This will shutdown when the exit future is fired.
    async fn start<P: ProtocolIdentity>(&mut self, filter_enabled: bool) {
        // Interval to prune to rate limiter.
        let mut interval = tokio::time::interval(Duration::from_secs(30));
        let mut first_buffer = [0; MAX_PACKET_SIZE];
        let mut second_buffer = [0; MAX_PACKET_SIZE];
        use futures::future::OptionFuture;
        // We want to completely deactivate this branch of the select when there is no second
        // socket to receive from.
        let check_second_recv = self.second_recv.is_some();

        loop {
            tokio::select! {
                Ok((length, src)) = self.recv.recv_from(&mut first_buffer) => {
                    METRICS.add_recv_bytes(length);
                    self.handle_inbound::<P>(src, length, &first_buffer).await;
                }
                Some(Ok((length, src))) = Into::<OptionFuture<_>>::into(self.second_recv.as_ref().map(|second_recv|second_recv.recv_from(&mut second_buffer))), if check_second_recv => {
                    METRICS.add_recv_bytes(length);
                    self.handle_inbound::<P>(src, length, &second_buffer).await;
                }
                _ = interval.tick(), if filter_enabled => {
                    self.filter.prune_limiter();
                },
                _ = &mut self.exit => {
                    debug!("Recv handler shutdown");
                    return;
                }
            }
        }
    }

    /// Handles in incoming packet. Passes through the filter, decodes and sends to the packet
    /// handler.
    async fn handle_inbound<P: ProtocolIdentity>(
        &mut self,
        mut src_address: SocketAddr,
        length: usize,
        recv_buffer: &[u8; MAX_PACKET_SIZE],
    ) {
        // Zero out the flowinfo and scope id of v6 socket addresses.
        //
        // Flowinfo contains both the Flow label and Traffic Class. These should be ignored by
        // nodes that do not support them when receiving packets according to
        // [RFC 2460 section 6](https://datatracker.ietf.org/doc/html/rfc2460#section-6) and
        // [RFC 2460 section 7](https://datatracker.ietf.org/doc/html/rfc2460#section-7)
        //
        // Excerpt from section 6
        // > Hosts or routers that do not support the functions of the Flow Label field are
        // > required to set the field to zero when originating a packet, pass the field on unchanged
        // > when forwarding a packet, and ignore the field when receiving a packet.
        //
        // Excerpt from section 7
        // > Nodes should ignore and leave unchanged any bits of the Traffic Class field for which
        // > they do not support a specific use.
        //
        // Since we do not forward this information, it's safe to zero it out.
        //
        // The scope id usage is formalized in [RFC 4007](https://www.rfc-editor.org/rfc/rfc4007)
        // and it's necessary to make link local ipv6 addresses routable.
        //
        // Since the Enr has no way to communicate scope ids, we opt for zeroing this data in order
        // to facilitate connectivity for nodes with a link-local address with an only interface.
        //
        // At the same time, we accept the risk of colission of nodes in a topology where there are
        // multiple interfaces and two nodes with the same link-local address. This risk is small
        // based in additional checks to packets.
        if let SocketAddr::V6(ref mut v6_socket_addr) = src_address {
            if v6_socket_addr.flowinfo() != 0 || v6_socket_addr.scope_id() != 0 {
                trace!("Zeroing out flowinfo and scope_id for v6 socket address. Original {v6_socket_addr}");
                v6_socket_addr.set_flowinfo(0);
                v6_socket_addr.set_scope_id(0);
            }
        }

        // Permit all expected responses
        let permitted = self.expected_responses.read().get(&src_address).is_some();

        // Perform the first run of the filter. This checks for rate limits and black listed IP
        // addresses.
        if !permitted && !self.filter.initial_pass(&src_address) {
            trace!("Packet filtered from source: {:?}", src_address);
            return;
        }
        // Decodes the packet
        let (packet, authenticated_data) =
            match Packet::decode::<P>(&self.node_id, &recv_buffer[..length]) {
                Ok(p) => p,
                Err(e) => {
                    debug!("Packet decoding failed: {:?}", e); // could not decode the packet, drop it
                    return;
                }
            };

        // If this is not a challenge packet, we immediately know its src_id and so pass it
        // through the second filter.
        if let Some(node_id) = packet.src_id() {
            // Construct the node address
            let node_address = NodeAddress {
                socket_addr: src_address,
                node_id,
            };

            // Perform packet-level filtering
            if !permitted && !self.filter.final_pass(&node_address, &packet) {
                return;
            }
        }

        let inbound = InboundPacket {
            src_address,
            header: packet.header,
            message: packet.message,
            authenticated_data,
        };

        // send the filtered decoded packet to the handler.
        self.handler
            .send(inbound)
            .await
            .unwrap_or_else(|e| warn!("Could not send packet to handler: {}", e));
    }
}