snarkos_node_router/
inbound.rs

1// Copyright 2024 Aleo Network Foundation
2// This file is part of the snarkOS library.
3
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at:
7
8// http://www.apache.org/licenses/LICENSE-2.0
9
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16use crate::{
17    Outbound,
18    Peer,
19    messages::{
20        BlockRequest,
21        BlockResponse,
22        DataBlocks,
23        Message,
24        PeerResponse,
25        Ping,
26        Pong,
27        UnconfirmedSolution,
28        UnconfirmedTransaction,
29    },
30};
31use snarkos_node_tcp::protocols::Reading;
32use snarkvm::prelude::{
33    Network,
34    block::{Block, Header, Transaction},
35    puzzle::Solution,
36};
37
38use anyhow::{Result, anyhow, bail};
39use snarkos_node_tcp::is_bogon_ip;
40use std::net::SocketAddr;
41use tokio::task::spawn_blocking;
42
43/// The max number of peers to send in a `PeerResponse` message.
44const MAX_PEERS_TO_SEND: usize = u8::MAX as usize;
45
46/// The maximum number of blocks the client can be behind it's latest peer before it skips
47/// processing incoming transactions and solutions.
48pub const SYNC_LENIENCY: u32 = 10;
49
50#[async_trait]
51pub trait Inbound<N: Network>: Reading + Outbound<N> {
52    /// The maximum number of puzzle requests per interval.
53    const MAXIMUM_PUZZLE_REQUESTS_PER_INTERVAL: usize = 5;
54    /// The maximum number of block requests per interval.
55    const MAXIMUM_BLOCK_REQUESTS_PER_INTERVAL: usize = 256;
56    /// The duration in seconds to sleep in between ping requests with a connected peer.
57    const PING_SLEEP_IN_SECS: u64 = 20; // 20 seconds
58    /// The time frame to enforce the `MESSAGE_LIMIT`.
59    const MESSAGE_LIMIT_TIME_FRAME_IN_SECS: i64 = 5;
60    /// The maximum number of messages accepted within `MESSAGE_LIMIT_TIME_FRAME_IN_SECS`.
61    const MESSAGE_LIMIT: usize = 500;
62
63    /// Handles the inbound message from the peer.
64    async fn inbound(&self, peer_addr: SocketAddr, message: Message<N>) -> Result<()> {
65        // Retrieve the listener IP for the peer.
66        let peer_ip = match self.router().resolve_to_listener(&peer_addr) {
67            Some(peer_ip) => peer_ip,
68            None => bail!("Unable to resolve the (ambiguous) peer address '{peer_addr}'"),
69        };
70
71        // Drop the peer, if they have sent more than `MESSAGE_LIMIT` messages
72        // in the last `MESSAGE_LIMIT_TIME_FRAME_IN_SECS` seconds.
73        let num_messages = self.router().cache.insert_inbound_message(peer_ip, Self::MESSAGE_LIMIT_TIME_FRAME_IN_SECS);
74        if num_messages > Self::MESSAGE_LIMIT {
75            bail!("Dropping '{peer_ip}' for spamming messages (num_messages = {num_messages})")
76        }
77
78        trace!("Received '{}' from '{peer_ip}'", message.name());
79
80        // Update the last seen timestamp of the peer.
81        self.router().update_last_seen_for_connected_peer(peer_ip);
82
83        // This match statement handles the inbound message by deserializing the message,
84        // checking that the message is valid, and then calling the appropriate (trait) handler.
85        match message {
86            Message::BlockRequest(message) => {
87                let BlockRequest { start_height, end_height } = &message;
88                // Insert the block request for the peer, and fetch the recent frequency.
89                let frequency = self.router().cache.insert_inbound_block_request(peer_ip);
90                // Check if the number of block requests is within the limit.
91                if frequency > Self::MAXIMUM_BLOCK_REQUESTS_PER_INTERVAL {
92                    bail!("Peer '{peer_ip}' is not following the protocol (excessive block requests)")
93                }
94                // Ensure the block request is well-formed.
95                if start_height >= end_height {
96                    bail!("Block request from '{peer_ip}' has an invalid range ({start_height}..{end_height})")
97                }
98                // Ensure that the block request is within the allowed bounds.
99                if end_height - start_height > DataBlocks::<N>::MAXIMUM_NUMBER_OF_BLOCKS as u32 {
100                    bail!("Block request from '{peer_ip}' has an excessive range ({start_height}..{end_height})")
101                }
102
103                let node = self.clone();
104                match spawn_blocking(move || node.block_request(peer_ip, message)).await? {
105                    true => Ok(()),
106                    false => bail!("Peer '{peer_ip}' sent an invalid block request"),
107                }
108            }
109            Message::BlockResponse(message) => {
110                let BlockResponse { request, blocks } = message;
111
112                // Remove the block request, checking if this node previously sent a block request to this peer.
113                if !self.router().cache.remove_outbound_block_request(peer_ip, &request) {
114                    bail!("Peer '{peer_ip}' is not following the protocol (unexpected block response)")
115                }
116                // Perform the deferred non-blocking deserialization of the blocks.
117                // The deserialization can take a long time (minutes). We should not be running
118                // this on a blocking task, but on a rayon thread pool.
119                let (send, recv) = tokio::sync::oneshot::channel();
120                rayon::spawn_fifo(move || {
121                    let blocks = blocks.deserialize_blocking().map_err(|error| anyhow!("[BlockResponse] {error}"));
122                    let _ = send.send(blocks);
123                });
124                let blocks = match recv.await {
125                    Ok(Ok(blocks)) => blocks,
126                    Ok(Err(error)) => bail!("Peer '{peer_ip}' sent an invalid block response - {error}"),
127                    Err(error) => bail!("Peer '{peer_ip}' sent an invalid block response - {error}"),
128                };
129
130                // Ensure the block response is well-formed.
131                blocks.ensure_response_is_well_formed(peer_ip, request.start_height, request.end_height)?;
132
133                // Process the block response.
134                let node = self.clone();
135                match spawn_blocking(move || node.block_response(peer_ip, blocks.0)).await? {
136                    true => Ok(()),
137                    false => bail!("Peer '{peer_ip}' sent an invalid block response"),
138                }
139            }
140            Message::ChallengeRequest(..) | Message::ChallengeResponse(..) => {
141                // Disconnect as the peer is not following the protocol.
142                bail!("Peer '{peer_ip}' is not following the protocol")
143            }
144            Message::Disconnect(message) => {
145                bail!("{:?}", message.reason)
146            }
147            Message::PeerRequest(..) => match self.peer_request(peer_ip) {
148                true => Ok(()),
149                false => bail!("Peer '{peer_ip}' sent an invalid peer request"),
150            },
151            Message::PeerResponse(message) => {
152                if !self.router().cache.contains_outbound_peer_request(peer_ip) {
153                    bail!("Peer '{peer_ip}' is not following the protocol (unexpected peer response)")
154                }
155                self.router().cache.decrement_outbound_peer_requests(peer_ip);
156                if !self.router().allow_external_peers() {
157                    bail!("Not accepting peer response from '{peer_ip}' (validator gossip is disabled)");
158                }
159
160                match self.peer_response(peer_ip, &message.peers) {
161                    true => Ok(()),
162                    false => bail!("Peer '{peer_ip}' sent an invalid peer response"),
163                }
164            }
165            Message::Ping(message) => {
166                // Ensure the message protocol version is not outdated.
167                if message.version < Message::<N>::VERSION {
168                    bail!("Dropping '{peer_ip}' on message version {} (outdated)", message.version);
169                }
170
171                // If the peer is a client or validator, ensure there are block locators.
172                let is_client_or_validator = message.node_type.is_client() || message.node_type.is_validator();
173                if is_client_or_validator && message.block_locators.is_none() {
174                    bail!("Peer '{peer_ip}' is a {}, but no block locators were provided", message.node_type);
175                }
176                // If the peer is a prover, ensure there are no block locators.
177                else if message.node_type.is_prover() && message.block_locators.is_some() {
178                    bail!("Peer '{peer_ip}' is a prover or client, but block locators were provided");
179                }
180
181                // Update the connected peer.
182                if let Err(error) =
183                    self.router().update_connected_peer(peer_ip, message.node_type, |peer: &mut Peer<N>| {
184                        // Update the version of the peer.
185                        peer.set_version(message.version);
186                        // Update the node type of the peer.
187                        peer.set_node_type(message.node_type);
188                    })
189                {
190                    bail!("[Ping] {error}");
191                }
192
193                // Process the ping message.
194                match self.ping(peer_ip, message) {
195                    true => Ok(()),
196                    false => bail!("Peer '{peer_ip}' sent an invalid ping"),
197                }
198            }
199            Message::Pong(message) => match self.pong(peer_ip, message) {
200                true => Ok(()),
201                false => bail!("Peer '{peer_ip}' sent an invalid pong"),
202            },
203            Message::PuzzleRequest(..) => {
204                // Insert the puzzle request for the peer, and fetch the recent frequency.
205                let frequency = self.router().cache.insert_inbound_puzzle_request(peer_ip);
206                // Check if the number of puzzle requests is within the limit.
207                if frequency > Self::MAXIMUM_PUZZLE_REQUESTS_PER_INTERVAL {
208                    bail!("Peer '{peer_ip}' is not following the protocol (excessive puzzle requests)")
209                }
210                // Process the puzzle request.
211                match self.puzzle_request(peer_ip) {
212                    true => Ok(()),
213                    false => bail!("Peer '{peer_ip}' sent an invalid puzzle request"),
214                }
215            }
216            Message::PuzzleResponse(message) => {
217                // Check that this node previously sent a puzzle request to this peer.
218                if !self.router().cache.contains_outbound_puzzle_request(&peer_ip) {
219                    bail!("Peer '{peer_ip}' is not following the protocol (unexpected puzzle response)")
220                }
221                // Decrement the number of puzzle requests.
222                self.router().cache.decrement_outbound_puzzle_requests(peer_ip);
223
224                // Perform the deferred non-blocking deserialization of the block header.
225                let header = match message.block_header.deserialize().await {
226                    Ok(header) => header,
227                    Err(error) => bail!("[PuzzleResponse] {error}"),
228                };
229                // Process the puzzle response.
230                match self.puzzle_response(peer_ip, message.epoch_hash, header) {
231                    true => Ok(()),
232                    false => bail!("Peer '{peer_ip}' sent an invalid puzzle response"),
233                }
234            }
235            Message::UnconfirmedSolution(message) => {
236                // Do not process unconfirmed solutions if the node is too far behind.
237                if self.num_blocks_behind() > SYNC_LENIENCY {
238                    trace!("Skipped processing unconfirmed solution '{}' (node is syncing)", message.solution_id);
239                    return Ok(());
240                }
241                // Update the timestamp for the unconfirmed solution.
242                let seen_before = self.router().cache.insert_inbound_solution(peer_ip, message.solution_id).is_some();
243                // Determine whether to propagate the solution.
244                if seen_before {
245                    trace!("Skipping 'UnconfirmedSolution' from '{peer_ip}'");
246                    return Ok(());
247                }
248                // Clone the serialized message.
249                let serialized = message.clone();
250                // Perform the deferred non-blocking deserialization of the solution.
251                let solution = match message.solution.deserialize().await {
252                    Ok(solution) => solution,
253                    Err(error) => bail!("[UnconfirmedSolution] {error}"),
254                };
255                // Check that the solution parameters match.
256                if message.solution_id != solution.id() {
257                    bail!("Peer '{peer_ip}' is not following the 'UnconfirmedSolution' protocol")
258                }
259                // Handle the unconfirmed solution.
260                match self.unconfirmed_solution(peer_ip, serialized, solution).await {
261                    true => Ok(()),
262                    false => bail!("Peer '{peer_ip}' sent an invalid unconfirmed solution"),
263                }
264            }
265            Message::UnconfirmedTransaction(message) => {
266                // Do not process unconfirmed transactions if the node is too far behind.
267                if self.num_blocks_behind() > SYNC_LENIENCY {
268                    trace!("Skipped processing unconfirmed transaction '{}' (node is syncing)", message.transaction_id);
269                    return Ok(());
270                }
271                // Update the timestamp for the unconfirmed transaction.
272                let seen_before =
273                    self.router().cache.insert_inbound_transaction(peer_ip, message.transaction_id).is_some();
274                // Determine whether to propagate the transaction.
275                if seen_before {
276                    trace!("Skipping 'UnconfirmedTransaction' from '{peer_ip}'");
277                    return Ok(());
278                }
279                // Clone the serialized message.
280                let serialized = message.clone();
281                // Perform the deferred non-blocking deserialization of the transaction.
282                let transaction = match message.transaction.deserialize().await {
283                    Ok(transaction) => transaction,
284                    Err(error) => bail!("[UnconfirmedTransaction] {error}"),
285                };
286                // Check that the transaction parameters match.
287                if message.transaction_id != transaction.id() {
288                    bail!("Peer '{peer_ip}' is not following the 'UnconfirmedTransaction' protocol")
289                }
290                // Handle the unconfirmed transaction.
291                match self.unconfirmed_transaction(peer_ip, serialized, transaction).await {
292                    true => Ok(()),
293                    false => bail!("Peer '{peer_ip}' sent an invalid unconfirmed transaction"),
294                }
295            }
296        }
297    }
298
299    /// Handles a `BlockRequest` message.
300    fn block_request(&self, peer_ip: SocketAddr, _message: BlockRequest) -> bool;
301
302    /// Handles a `BlockResponse` message.
303    fn block_response(&self, peer_ip: SocketAddr, _blocks: Vec<Block<N>>) -> bool;
304
305    /// Handles a `PeerRequest` message.
306    fn peer_request(&self, peer_ip: SocketAddr) -> bool {
307        // Retrieve the connected peers.
308        let peers = self.router().connected_peers();
309        // Filter out invalid addresses.
310        let peers = match self.router().is_dev() {
311            // In development mode, relax the validity requirements to make operating devnets more flexible.
312            true => {
313                peers.into_iter().filter(|ip| *ip != peer_ip && !is_bogon_ip(ip.ip())).take(MAX_PEERS_TO_SEND).collect()
314            }
315            // In production mode, ensure the peer IPs are valid.
316            false => peers
317                .into_iter()
318                .filter(|ip| *ip != peer_ip && self.router().is_valid_peer_ip(ip))
319                .take(MAX_PEERS_TO_SEND)
320                .collect(),
321        };
322        // Send a `PeerResponse` message to the peer.
323        self.send(peer_ip, Message::PeerResponse(PeerResponse { peers }));
324        true
325    }
326
327    /// Handles a `PeerResponse` message.
328    fn peer_response(&self, _peer_ip: SocketAddr, peers: &[SocketAddr]) -> bool {
329        // Check if the number of peers received is less than MAX_PEERS_TO_SEND.
330        if peers.len() > MAX_PEERS_TO_SEND {
331            return false;
332        }
333        // Filter out invalid addresses.
334        let peers = match self.router().is_dev() {
335            // In development mode, relax the validity requirements to make operating devnets more flexible.
336            true => peers.iter().copied().filter(|ip| !is_bogon_ip(ip.ip())).collect::<Vec<_>>(),
337            // In production mode, ensure the peer IPs are valid.
338            false => peers.iter().copied().filter(|ip| self.router().is_valid_peer_ip(ip)).collect(),
339        };
340        // Adds the given peer IPs to the list of candidate peers.
341        self.router().insert_candidate_peers(&peers);
342        true
343    }
344
345    /// Handles a `Ping` message.
346    fn ping(&self, peer_ip: SocketAddr, message: Ping<N>) -> bool;
347
348    /// Sleeps for a period and then sends a `Ping` message to the peer.
349    fn pong(&self, peer_ip: SocketAddr, _message: Pong) -> bool;
350
351    /// Handles a `PuzzleRequest` message.
352    fn puzzle_request(&self, peer_ip: SocketAddr) -> bool;
353
354    /// Handles a `PuzzleResponse` message.
355    fn puzzle_response(&self, peer_ip: SocketAddr, _epoch_hash: N::BlockHash, _header: Header<N>) -> bool;
356
357    /// Handles an `UnconfirmedSolution` message.
358    async fn unconfirmed_solution(
359        &self,
360        peer_ip: SocketAddr,
361        serialized: UnconfirmedSolution<N>,
362        solution: Solution<N>,
363    ) -> bool;
364
365    /// Handles an `UnconfirmedTransaction` message.
366    async fn unconfirmed_transaction(
367        &self,
368        peer_ip: SocketAddr,
369        serialized: UnconfirmedTransaction<N>,
370        _transaction: Transaction<N>,
371    ) -> bool;
372}