snarkos_node_router/
inbound.rs

1// Copyright (c) 2019-2025 Provable Inc.
2// This file is part of the snarkOS library.
3
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at:
7
8// http://www.apache.org/licenses/LICENSE-2.0
9
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16use crate::{
17    Outbound,
18    messages::{
19        BlockRequest,
20        BlockResponse,
21        DataBlocks,
22        Message,
23        PeerResponse,
24        Ping,
25        Pong,
26        UnconfirmedSolution,
27        UnconfirmedTransaction,
28    },
29};
30use snarkos_node_tcp::protocols::Reading;
31use snarkvm::prelude::{
32    Network,
33    block::{Block, Header, Transaction},
34    puzzle::Solution,
35};
36
37use anyhow::{Result, anyhow, bail};
38use snarkos_node_tcp::is_bogon_ip;
39use std::net::SocketAddr;
40use tokio::task::spawn_blocking;
41
42/// The max number of peers to send in a `PeerResponse` message.
43const MAX_PEERS_TO_SEND: usize = u8::MAX as usize;
44
45#[async_trait]
46pub trait Inbound<N: Network>: Reading + Outbound<N> {
47    /// The maximum number of puzzle requests per interval.
48    const MAXIMUM_PUZZLE_REQUESTS_PER_INTERVAL: usize = 5;
49    /// The maximum number of block requests per interval.
50    const MAXIMUM_BLOCK_REQUESTS_PER_INTERVAL: usize = 256;
51    /// The duration in seconds to sleep in between ping requests with a connected peer.
52    const PING_SLEEP_IN_SECS: u64 = 20; // 20 seconds
53    /// The time frame to enforce the `MESSAGE_LIMIT`.
54    const MESSAGE_LIMIT_TIME_FRAME_IN_SECS: i64 = 5;
55    /// The maximum number of messages accepted within `MESSAGE_LIMIT_TIME_FRAME_IN_SECS`.
56    const MESSAGE_LIMIT: usize = 500;
57
58    /// Returns `true` if the message version is valid.
59    fn is_valid_message_version(&self, message_version: u32) -> bool;
60
61    /// Is the node synced enough to process unconfirmed transactions and solutions?
62    fn is_within_sync_leniency(&self) -> bool {
63        // The maximum number of blocks the client can be behind it's latest peer before it skips
64        // processing incoming transactions and solutions.
65        const SYNC_LENIENCY: u32 = 10;
66
67        if let Some(num) = self.num_blocks_behind() {
68            num <= SYNC_LENIENCY
69        } else {
70            // We have not received block locators yet.
71            true
72        }
73    }
74
75    /// Handles the inbound message from the peer. The returned value indicates whether
76    /// the connection is still active, and errors causing a disconnect once they are
77    /// propagated to the caller.
78    async fn inbound(&self, peer_addr: SocketAddr, message: Message<N>) -> Result<bool> {
79        // Retrieve the listener IP for the peer.
80        let peer_ip = match self.router().resolve_to_listener(&peer_addr) {
81            Some(peer_ip) => peer_ip,
82            None => {
83                // No longer connected to the peer.
84                trace!("Dropping a {} from {peer_addr} - no longer connected.", message.name());
85                return Ok(false);
86            }
87        };
88
89        // Drop the peer, if they have sent more than `MESSAGE_LIMIT` messages
90        // in the last `MESSAGE_LIMIT_TIME_FRAME_IN_SECS` seconds.
91        let num_messages = self.router().cache.insert_inbound_message(peer_ip, Self::MESSAGE_LIMIT_TIME_FRAME_IN_SECS);
92        if num_messages > Self::MESSAGE_LIMIT {
93            bail!("Dropping '{peer_ip}' for spamming messages (num_messages = {num_messages})")
94        }
95
96        trace!("Received '{}' from '{peer_ip}'", message.name());
97
98        // Update the last seen timestamp of the peer.
99        self.router().update_last_seen_for_connected_peer(peer_ip);
100
101        // This match statement handles the inbound message by deserializing the message,
102        // checking that the message is valid, and then calling the appropriate (trait) handler.
103        match message {
104            Message::BlockRequest(message) => {
105                let BlockRequest { start_height, end_height } = &message;
106                // Insert the block request for the peer, and fetch the recent frequency.
107                let frequency = self.router().cache.insert_inbound_block_request(peer_ip);
108                // Check if the number of block requests is within the limit.
109                if frequency > Self::MAXIMUM_BLOCK_REQUESTS_PER_INTERVAL {
110                    bail!("Peer '{peer_ip}' is not following the protocol (excessive block requests)")
111                }
112                // Ensure the block request is well-formed.
113                if start_height >= end_height {
114                    bail!("Block request from '{peer_ip}' has an invalid range ({start_height}..{end_height})")
115                }
116                // Ensure that the block request is within the allowed bounds.
117                if end_height - start_height > DataBlocks::<N>::MAXIMUM_NUMBER_OF_BLOCKS as u32 {
118                    bail!("Block request from '{peer_ip}' has an excessive range ({start_height}..{end_height})")
119                }
120
121                let node = self.clone();
122                match spawn_blocking(move || node.block_request(peer_ip, message)).await? {
123                    true => Ok(true),
124                    false => bail!("Peer '{peer_ip}' sent an invalid block request"),
125                }
126            }
127            Message::BlockResponse(message) => {
128                let BlockResponse { request, blocks } = message;
129
130                // Remove the block request, checking if this node previously sent a block request to this peer.
131                if !self.router().cache.remove_outbound_block_request(peer_ip, &request) {
132                    bail!("Peer '{peer_ip}' is not following the protocol (unexpected block response)")
133                }
134                // Perform the deferred non-blocking deserialization of the blocks.
135                // The deserialization can take a long time (minutes). We should not be running
136                // this on a blocking task, but on a rayon thread pool.
137                let (send, recv) = tokio::sync::oneshot::channel();
138                rayon::spawn_fifo(move || {
139                    let blocks = blocks.deserialize_blocking().map_err(|error| anyhow!("[BlockResponse] {error}"));
140                    let _ = send.send(blocks);
141                });
142                let blocks = match recv.await {
143                    Ok(Ok(blocks)) => blocks,
144                    Ok(Err(error)) => bail!("Peer '{peer_ip}' sent an invalid block response - {error}"),
145                    Err(error) => bail!("Peer '{peer_ip}' sent an invalid block response - {error}"),
146                };
147
148                // Ensure the block response is well-formed.
149                blocks.ensure_response_is_well_formed(peer_ip, request.start_height, request.end_height)?;
150
151                // Process the block response.
152                let node = self.clone();
153                match spawn_blocking(move || node.block_response(peer_ip, blocks.0)).await? {
154                    true => Ok(true),
155                    false => bail!("Peer '{peer_ip}' sent an invalid block response"),
156                }
157            }
158            Message::ChallengeRequest(..) | Message::ChallengeResponse(..) => {
159                // Disconnect as the peer is not following the protocol.
160                bail!("Peer '{peer_ip}' is not following the protocol")
161            }
162            Message::Disconnect(message) => {
163                // The peer informs us that they had disconnected. Disconnect from them too.
164                debug!("Peer '{peer_ip}' decided to disconnect due to '{:?}'", message.reason);
165                self.router().disconnect(peer_ip);
166                Ok(false)
167            }
168            Message::PeerRequest(..) => match self.peer_request(peer_ip) {
169                true => Ok(true),
170                false => bail!("Peer '{peer_ip}' sent an invalid peer request"),
171            },
172            Message::PeerResponse(message) => {
173                if !self.router().cache.contains_outbound_peer_request(peer_ip) {
174                    bail!("Peer '{peer_ip}' is not following the protocol (unexpected peer response)")
175                }
176                self.router().cache.decrement_outbound_peer_requests(peer_ip);
177                if !self.router().allow_external_peers() {
178                    bail!("Not accepting peer response from '{peer_ip}' (validator gossip is disabled)");
179                }
180
181                match self.peer_response(peer_ip, &message.peers) {
182                    true => Ok(true),
183                    false => bail!("Peer '{peer_ip}' sent an invalid peer response"),
184                }
185            }
186            Message::Ping(message) => {
187                // Ensure the message protocol version is not outdated.
188                if !self.is_valid_message_version(message.version) {
189                    bail!("Dropping '{peer_ip}' on message version {} (outdated)", message.version);
190                }
191
192                // If the peer is a client or validator, ensure there are block locators.
193                let is_client_or_validator = message.node_type.is_client() || message.node_type.is_validator();
194                if is_client_or_validator && message.block_locators.is_none() {
195                    bail!("Peer '{peer_ip}' is a {}, but no block locators were provided", message.node_type);
196                }
197                // If the peer is a prover, ensure there are no block locators.
198                else if message.node_type.is_prover() && message.block_locators.is_some() {
199                    bail!("Peer '{peer_ip}' is a prover or client, but block locators were provided");
200                }
201
202                // Process the ping message.
203                match self.ping(peer_ip, message) {
204                    true => Ok(true),
205                    false => bail!("Peer '{peer_ip}' sent an invalid ping"),
206                }
207            }
208            Message::Pong(message) => match self.pong(peer_ip, message) {
209                true => Ok(true),
210                false => bail!("Peer '{peer_ip}' sent an invalid pong"),
211            },
212            Message::PuzzleRequest(..) => {
213                // Insert the puzzle request for the peer, and fetch the recent frequency.
214                let frequency = self.router().cache.insert_inbound_puzzle_request(peer_ip);
215                // Check if the number of puzzle requests is within the limit.
216                if frequency > Self::MAXIMUM_PUZZLE_REQUESTS_PER_INTERVAL {
217                    bail!("Peer '{peer_ip}' is not following the protocol (excessive puzzle requests)")
218                }
219                // Process the puzzle request.
220                match self.puzzle_request(peer_ip) {
221                    true => Ok(true),
222                    false => bail!("Peer '{peer_ip}' sent an invalid puzzle request"),
223                }
224            }
225            Message::PuzzleResponse(message) => {
226                // Check that this node previously sent a puzzle request to this peer.
227                if !self.router().cache.contains_outbound_puzzle_request(&peer_ip) {
228                    bail!("Peer '{peer_ip}' is not following the protocol (unexpected puzzle response)")
229                }
230                // Decrement the number of puzzle requests.
231                self.router().cache.decrement_outbound_puzzle_requests(peer_ip);
232
233                // Perform the deferred non-blocking deserialization of the block header.
234                let header = match message.block_header.deserialize().await {
235                    Ok(header) => header,
236                    Err(error) => bail!("[PuzzleResponse] {error}"),
237                };
238                // Process the puzzle response.
239                match self.puzzle_response(peer_ip, message.epoch_hash, header) {
240                    true => Ok(true),
241                    false => bail!("Peer '{peer_ip}' sent an invalid puzzle response"),
242                }
243            }
244            Message::UnconfirmedSolution(message) => {
245                // Do not process unconfirmed solutions if the node is too far behind.
246                if !self.is_within_sync_leniency() {
247                    trace!("Skipped processing unconfirmed solution '{}' (node is syncing)", message.solution_id);
248                    return Ok(true);
249                }
250
251                // Update the timestamp for the unconfirmed solution.
252                let seen_before = self.router().cache.insert_inbound_solution(peer_ip, message.solution_id).is_some();
253                // Determine whether to propagate the solution.
254                if seen_before {
255                    trace!("Skipping 'UnconfirmedSolution' from '{peer_ip}'");
256                    return Ok(true);
257                }
258                // Clone the serialized message.
259                let serialized = message.clone();
260                // Perform the deferred non-blocking deserialization of the solution.
261                let solution = match message.solution.deserialize().await {
262                    Ok(solution) => solution,
263                    Err(error) => bail!("[UnconfirmedSolution] {error}"),
264                };
265                // Check that the solution parameters match.
266                if message.solution_id != solution.id() {
267                    bail!("Peer '{peer_ip}' is not following the 'UnconfirmedSolution' protocol")
268                }
269                // Handle the unconfirmed solution.
270                match self.unconfirmed_solution(peer_ip, serialized, solution).await {
271                    true => Ok(true),
272                    false => bail!("Peer '{peer_ip}' sent an invalid unconfirmed solution"),
273                }
274            }
275            Message::UnconfirmedTransaction(message) => {
276                // Do not process unconfirmed solutions if the node is too far behind.
277                if !self.is_within_sync_leniency() {
278                    trace!("Skipped processing unconfirmed transaction '{}' (node is syncing)", message.transaction_id);
279                    return Ok(true);
280                }
281                // Update the timestamp for the unconfirmed transaction.
282                let seen_before =
283                    self.router().cache.insert_inbound_transaction(peer_ip, message.transaction_id).is_some();
284                // Determine whether to propagate the transaction.
285                if seen_before {
286                    trace!("Skipping 'UnconfirmedTransaction' from '{peer_ip}'");
287                    return Ok(true);
288                }
289                // Clone the serialized message.
290                let serialized = message.clone();
291                // Perform the deferred non-blocking deserialization of the transaction.
292                let transaction = match message.transaction.deserialize().await {
293                    Ok(transaction) => transaction,
294                    Err(error) => bail!("[UnconfirmedTransaction] {error}"),
295                };
296                // Check that the transaction parameters match.
297                if message.transaction_id != transaction.id() {
298                    bail!("Peer '{peer_ip}' is not following the 'UnconfirmedTransaction' protocol")
299                }
300                // Handle the unconfirmed transaction.
301                match self.unconfirmed_transaction(peer_ip, serialized, transaction).await {
302                    true => Ok(true),
303                    false => bail!("Peer '{peer_ip}' sent an invalid unconfirmed transaction"),
304                }
305            }
306        }
307    }
308
309    /// Handles a `BlockRequest` message.
310    fn block_request(&self, peer_ip: SocketAddr, _message: BlockRequest) -> bool;
311
312    /// Handles a `BlockResponse` message.
313    fn block_response(&self, peer_ip: SocketAddr, _blocks: Vec<Block<N>>) -> bool;
314
315    /// Handles a `PeerRequest` message.
316    fn peer_request(&self, peer_ip: SocketAddr) -> bool {
317        // Retrieve the connected peers.
318        let peers = self.router().connected_peers();
319        // Filter out invalid addresses.
320        let peers = match self.router().is_dev() {
321            // In development mode, relax the validity requirements to make operating devnets more flexible.
322            true => {
323                peers.into_iter().filter(|ip| *ip != peer_ip && !is_bogon_ip(ip.ip())).take(MAX_PEERS_TO_SEND).collect()
324            }
325            // In production mode, ensure the peer IPs are valid.
326            false => peers
327                .into_iter()
328                .filter(|ip| *ip != peer_ip && self.router().is_valid_peer_ip(ip))
329                .take(MAX_PEERS_TO_SEND)
330                .collect(),
331        };
332        // Send a `PeerResponse` message to the peer.
333        self.router().send(peer_ip, Message::PeerResponse(PeerResponse { peers }));
334        true
335    }
336
337    /// Handles a `PeerResponse` message.
338    fn peer_response(&self, _peer_ip: SocketAddr, peers: &[SocketAddr]) -> bool {
339        // Check if the number of peers received is less than MAX_PEERS_TO_SEND.
340        if peers.len() > MAX_PEERS_TO_SEND {
341            return false;
342        }
343        // Filter out invalid addresses.
344        let peers = match self.router().is_dev() {
345            // In development mode, relax the validity requirements to make operating devnets more flexible.
346            true => peers.iter().copied().filter(|ip| !is_bogon_ip(ip.ip())).collect::<Vec<_>>(),
347            // In production mode, ensure the peer IPs are valid.
348            false => peers.iter().copied().filter(|ip| self.router().is_valid_peer_ip(ip)).collect(),
349        };
350        // Adds the given peer IPs to the list of candidate peers.
351        self.router().insert_candidate_peers(&peers);
352        true
353    }
354
355    /// Handles a `Ping` message.
356    fn ping(&self, peer_ip: SocketAddr, message: Ping<N>) -> bool;
357
358    /// Sleeps for a period and then sends a `Ping` message to the peer.
359    fn pong(&self, peer_ip: SocketAddr, _message: Pong) -> bool;
360
361    /// Handles a `PuzzleRequest` message.
362    fn puzzle_request(&self, peer_ip: SocketAddr) -> bool;
363
364    /// Handles a `PuzzleResponse` message.
365    fn puzzle_response(&self, peer_ip: SocketAddr, _epoch_hash: N::BlockHash, _header: Header<N>) -> bool;
366
367    /// Handles an `UnconfirmedSolution` message.
368    async fn unconfirmed_solution(
369        &self,
370        peer_ip: SocketAddr,
371        serialized: UnconfirmedSolution<N>,
372        solution: Solution<N>,
373    ) -> bool;
374
375    /// Handles an `UnconfirmedTransaction` message.
376    async fn unconfirmed_transaction(
377        &self,
378        peer_ip: SocketAddr,
379        serialized: UnconfirmedTransaction<N>,
380        _transaction: Transaction<N>,
381    ) -> bool;
382}