snarkos_node_router/
inbound.rs

1// Copyright (c) 2019-2025 Provable Inc.
2// This file is part of the snarkOS library.
3
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at:
7
8// http://www.apache.org/licenses/LICENSE-2.0
9
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16use crate::{
17    Outbound,
18    Peer,
19    messages::{
20        BlockRequest,
21        BlockResponse,
22        DataBlocks,
23        Message,
24        PeerResponse,
25        Ping,
26        Pong,
27        UnconfirmedSolution,
28        UnconfirmedTransaction,
29    },
30};
31use snarkos_node_tcp::protocols::Reading;
32use snarkvm::prelude::{
33    Network,
34    block::{Block, Header, Transaction},
35    puzzle::Solution,
36};
37
38use anyhow::{Result, anyhow, bail};
39use snarkos_node_tcp::is_bogon_ip;
40use std::net::SocketAddr;
41use tokio::task::spawn_blocking;
42
43/// The max number of peers to send in a `PeerResponse` message.
44const MAX_PEERS_TO_SEND: usize = u8::MAX as usize;
45
46/// The maximum number of blocks the client can be behind it's latest peer before it skips
47/// processing incoming transactions and solutions.
48pub const SYNC_LENIENCY: u32 = 10;
49
50#[async_trait]
51pub trait Inbound<N: Network>: Reading + Outbound<N> {
52    /// The maximum number of puzzle requests per interval.
53    const MAXIMUM_PUZZLE_REQUESTS_PER_INTERVAL: usize = 5;
54    /// The maximum number of block requests per interval.
55    const MAXIMUM_BLOCK_REQUESTS_PER_INTERVAL: usize = 256;
56    /// The duration in seconds to sleep in between ping requests with a connected peer.
57    const PING_SLEEP_IN_SECS: u64 = 20; // 20 seconds
58    /// The time frame to enforce the `MESSAGE_LIMIT`.
59    const MESSAGE_LIMIT_TIME_FRAME_IN_SECS: i64 = 5;
60    /// The maximum number of messages accepted within `MESSAGE_LIMIT_TIME_FRAME_IN_SECS`.
61    const MESSAGE_LIMIT: usize = 500;
62
63    /// Returns `true` if the message version is valid.
64    fn is_valid_message_version(&self, message_version: u32) -> bool;
65
66    /// Handles the inbound message from the peer.
67    async fn inbound(&self, peer_addr: SocketAddr, message: Message<N>) -> Result<()> {
68        // Retrieve the listener IP for the peer.
69        let peer_ip = match self.router().resolve_to_listener(&peer_addr) {
70            Some(peer_ip) => peer_ip,
71            None => bail!("Unable to resolve the (ambiguous) peer address '{peer_addr}'"),
72        };
73
74        // Drop the peer, if they have sent more than `MESSAGE_LIMIT` messages
75        // in the last `MESSAGE_LIMIT_TIME_FRAME_IN_SECS` seconds.
76        let num_messages = self.router().cache.insert_inbound_message(peer_ip, Self::MESSAGE_LIMIT_TIME_FRAME_IN_SECS);
77        if num_messages > Self::MESSAGE_LIMIT {
78            bail!("Dropping '{peer_ip}' for spamming messages (num_messages = {num_messages})")
79        }
80
81        trace!("Received '{}' from '{peer_ip}'", message.name());
82
83        // Update the last seen timestamp of the peer.
84        self.router().update_last_seen_for_connected_peer(peer_ip);
85
86        // This match statement handles the inbound message by deserializing the message,
87        // checking that the message is valid, and then calling the appropriate (trait) handler.
88        match message {
89            Message::BlockRequest(message) => {
90                let BlockRequest { start_height, end_height } = &message;
91                // Insert the block request for the peer, and fetch the recent frequency.
92                let frequency = self.router().cache.insert_inbound_block_request(peer_ip);
93                // Check if the number of block requests is within the limit.
94                if frequency > Self::MAXIMUM_BLOCK_REQUESTS_PER_INTERVAL {
95                    bail!("Peer '{peer_ip}' is not following the protocol (excessive block requests)")
96                }
97                // Ensure the block request is well-formed.
98                if start_height >= end_height {
99                    bail!("Block request from '{peer_ip}' has an invalid range ({start_height}..{end_height})")
100                }
101                // Ensure that the block request is within the allowed bounds.
102                if end_height - start_height > DataBlocks::<N>::MAXIMUM_NUMBER_OF_BLOCKS as u32 {
103                    bail!("Block request from '{peer_ip}' has an excessive range ({start_height}..{end_height})")
104                }
105
106                let node = self.clone();
107                match spawn_blocking(move || node.block_request(peer_ip, message)).await? {
108                    true => Ok(()),
109                    false => bail!("Peer '{peer_ip}' sent an invalid block request"),
110                }
111            }
112            Message::BlockResponse(message) => {
113                let BlockResponse { request, blocks } = message;
114
115                // Remove the block request, checking if this node previously sent a block request to this peer.
116                if !self.router().cache.remove_outbound_block_request(peer_ip, &request) {
117                    bail!("Peer '{peer_ip}' is not following the protocol (unexpected block response)")
118                }
119                // Perform the deferred non-blocking deserialization of the blocks.
120                // The deserialization can take a long time (minutes). We should not be running
121                // this on a blocking task, but on a rayon thread pool.
122                let (send, recv) = tokio::sync::oneshot::channel();
123                rayon::spawn_fifo(move || {
124                    let blocks = blocks.deserialize_blocking().map_err(|error| anyhow!("[BlockResponse] {error}"));
125                    let _ = send.send(blocks);
126                });
127                let blocks = match recv.await {
128                    Ok(Ok(blocks)) => blocks,
129                    Ok(Err(error)) => bail!("Peer '{peer_ip}' sent an invalid block response - {error}"),
130                    Err(error) => bail!("Peer '{peer_ip}' sent an invalid block response - {error}"),
131                };
132
133                // Ensure the block response is well-formed.
134                blocks.ensure_response_is_well_formed(peer_ip, request.start_height, request.end_height)?;
135
136                // Process the block response.
137                let node = self.clone();
138                match spawn_blocking(move || node.block_response(peer_ip, blocks.0)).await? {
139                    true => Ok(()),
140                    false => bail!("Peer '{peer_ip}' sent an invalid block response"),
141                }
142            }
143            Message::ChallengeRequest(..) | Message::ChallengeResponse(..) => {
144                // Disconnect as the peer is not following the protocol.
145                bail!("Peer '{peer_ip}' is not following the protocol")
146            }
147            Message::Disconnect(message) => {
148                bail!("{:?}", message.reason)
149            }
150            Message::PeerRequest(..) => match self.peer_request(peer_ip) {
151                true => Ok(()),
152                false => bail!("Peer '{peer_ip}' sent an invalid peer request"),
153            },
154            Message::PeerResponse(message) => {
155                if !self.router().cache.contains_outbound_peer_request(peer_ip) {
156                    bail!("Peer '{peer_ip}' is not following the protocol (unexpected peer response)")
157                }
158                self.router().cache.decrement_outbound_peer_requests(peer_ip);
159                if !self.router().allow_external_peers() {
160                    bail!("Not accepting peer response from '{peer_ip}' (validator gossip is disabled)");
161                }
162
163                match self.peer_response(peer_ip, &message.peers) {
164                    true => Ok(()),
165                    false => bail!("Peer '{peer_ip}' sent an invalid peer response"),
166                }
167            }
168            Message::Ping(message) => {
169                // Ensure the message protocol version is not outdated.
170                if !self.is_valid_message_version(message.version) {
171                    bail!("Dropping '{peer_ip}' on message version {} (outdated)", message.version);
172                }
173
174                // If the peer is a client or validator, ensure there are block locators.
175                let is_client_or_validator = message.node_type.is_client() || message.node_type.is_validator();
176                if is_client_or_validator && message.block_locators.is_none() {
177                    bail!("Peer '{peer_ip}' is a {}, but no block locators were provided", message.node_type);
178                }
179                // If the peer is a prover, ensure there are no block locators.
180                else if message.node_type.is_prover() && message.block_locators.is_some() {
181                    bail!("Peer '{peer_ip}' is a prover or client, but block locators were provided");
182                }
183
184                // Update the connected peer.
185                if let Err(error) =
186                    self.router().update_connected_peer(peer_ip, message.node_type, |peer: &mut Peer<N>| {
187                        // Update the version of the peer.
188                        peer.set_version(message.version);
189                        // Update the node type of the peer.
190                        peer.set_node_type(message.node_type);
191                    })
192                {
193                    bail!("[Ping] {error}");
194                }
195
196                // Process the ping message.
197                match self.ping(peer_ip, message) {
198                    true => Ok(()),
199                    false => bail!("Peer '{peer_ip}' sent an invalid ping"),
200                }
201            }
202            Message::Pong(message) => match self.pong(peer_ip, message) {
203                true => Ok(()),
204                false => bail!("Peer '{peer_ip}' sent an invalid pong"),
205            },
206            Message::PuzzleRequest(..) => {
207                // Insert the puzzle request for the peer, and fetch the recent frequency.
208                let frequency = self.router().cache.insert_inbound_puzzle_request(peer_ip);
209                // Check if the number of puzzle requests is within the limit.
210                if frequency > Self::MAXIMUM_PUZZLE_REQUESTS_PER_INTERVAL {
211                    bail!("Peer '{peer_ip}' is not following the protocol (excessive puzzle requests)")
212                }
213                // Process the puzzle request.
214                match self.puzzle_request(peer_ip) {
215                    true => Ok(()),
216                    false => bail!("Peer '{peer_ip}' sent an invalid puzzle request"),
217                }
218            }
219            Message::PuzzleResponse(message) => {
220                // Check that this node previously sent a puzzle request to this peer.
221                if !self.router().cache.contains_outbound_puzzle_request(&peer_ip) {
222                    bail!("Peer '{peer_ip}' is not following the protocol (unexpected puzzle response)")
223                }
224                // Decrement the number of puzzle requests.
225                self.router().cache.decrement_outbound_puzzle_requests(peer_ip);
226
227                // Perform the deferred non-blocking deserialization of the block header.
228                let header = match message.block_header.deserialize().await {
229                    Ok(header) => header,
230                    Err(error) => bail!("[PuzzleResponse] {error}"),
231                };
232                // Process the puzzle response.
233                match self.puzzle_response(peer_ip, message.epoch_hash, header) {
234                    true => Ok(()),
235                    false => bail!("Peer '{peer_ip}' sent an invalid puzzle response"),
236                }
237            }
238            Message::UnconfirmedSolution(message) => {
239                // Do not process unconfirmed solutions if the node is too far behind.
240                if self.num_blocks_behind() > SYNC_LENIENCY {
241                    trace!("Skipped processing unconfirmed solution '{}' (node is syncing)", message.solution_id);
242                    return Ok(());
243                }
244                // Update the timestamp for the unconfirmed solution.
245                let seen_before = self.router().cache.insert_inbound_solution(peer_ip, message.solution_id).is_some();
246                // Determine whether to propagate the solution.
247                if seen_before {
248                    trace!("Skipping 'UnconfirmedSolution' from '{peer_ip}'");
249                    return Ok(());
250                }
251                // Clone the serialized message.
252                let serialized = message.clone();
253                // Perform the deferred non-blocking deserialization of the solution.
254                let solution = match message.solution.deserialize().await {
255                    Ok(solution) => solution,
256                    Err(error) => bail!("[UnconfirmedSolution] {error}"),
257                };
258                // Check that the solution parameters match.
259                if message.solution_id != solution.id() {
260                    bail!("Peer '{peer_ip}' is not following the 'UnconfirmedSolution' protocol")
261                }
262                // Handle the unconfirmed solution.
263                match self.unconfirmed_solution(peer_ip, serialized, solution).await {
264                    true => Ok(()),
265                    false => bail!("Peer '{peer_ip}' sent an invalid unconfirmed solution"),
266                }
267            }
268            Message::UnconfirmedTransaction(message) => {
269                // Do not process unconfirmed transactions if the node is too far behind.
270                if self.num_blocks_behind() > SYNC_LENIENCY {
271                    trace!("Skipped processing unconfirmed transaction '{}' (node is syncing)", message.transaction_id);
272                    return Ok(());
273                }
274                // Update the timestamp for the unconfirmed transaction.
275                let seen_before =
276                    self.router().cache.insert_inbound_transaction(peer_ip, message.transaction_id).is_some();
277                // Determine whether to propagate the transaction.
278                if seen_before {
279                    trace!("Skipping 'UnconfirmedTransaction' from '{peer_ip}'");
280                    return Ok(());
281                }
282                // Clone the serialized message.
283                let serialized = message.clone();
284                // Perform the deferred non-blocking deserialization of the transaction.
285                let transaction = match message.transaction.deserialize().await {
286                    Ok(transaction) => transaction,
287                    Err(error) => bail!("[UnconfirmedTransaction] {error}"),
288                };
289                // Check that the transaction parameters match.
290                if message.transaction_id != transaction.id() {
291                    bail!("Peer '{peer_ip}' is not following the 'UnconfirmedTransaction' protocol")
292                }
293                // Handle the unconfirmed transaction.
294                match self.unconfirmed_transaction(peer_ip, serialized, transaction).await {
295                    true => Ok(()),
296                    false => bail!("Peer '{peer_ip}' sent an invalid unconfirmed transaction"),
297                }
298            }
299        }
300    }
301
302    /// Handles a `BlockRequest` message.
303    fn block_request(&self, peer_ip: SocketAddr, _message: BlockRequest) -> bool;
304
305    /// Handles a `BlockResponse` message.
306    fn block_response(&self, peer_ip: SocketAddr, _blocks: Vec<Block<N>>) -> bool;
307
308    /// Handles a `PeerRequest` message.
309    fn peer_request(&self, peer_ip: SocketAddr) -> bool {
310        // Retrieve the connected peers.
311        let peers = self.router().connected_peers();
312        // Filter out invalid addresses.
313        let peers = match self.router().is_dev() {
314            // In development mode, relax the validity requirements to make operating devnets more flexible.
315            true => {
316                peers.into_iter().filter(|ip| *ip != peer_ip && !is_bogon_ip(ip.ip())).take(MAX_PEERS_TO_SEND).collect()
317            }
318            // In production mode, ensure the peer IPs are valid.
319            false => peers
320                .into_iter()
321                .filter(|ip| *ip != peer_ip && self.router().is_valid_peer_ip(ip))
322                .take(MAX_PEERS_TO_SEND)
323                .collect(),
324        };
325        // Send a `PeerResponse` message to the peer.
326        self.send(peer_ip, Message::PeerResponse(PeerResponse { peers }));
327        true
328    }
329
330    /// Handles a `PeerResponse` message.
331    fn peer_response(&self, _peer_ip: SocketAddr, peers: &[SocketAddr]) -> bool {
332        // Check if the number of peers received is less than MAX_PEERS_TO_SEND.
333        if peers.len() > MAX_PEERS_TO_SEND {
334            return false;
335        }
336        // Filter out invalid addresses.
337        let peers = match self.router().is_dev() {
338            // In development mode, relax the validity requirements to make operating devnets more flexible.
339            true => peers.iter().copied().filter(|ip| !is_bogon_ip(ip.ip())).collect::<Vec<_>>(),
340            // In production mode, ensure the peer IPs are valid.
341            false => peers.iter().copied().filter(|ip| self.router().is_valid_peer_ip(ip)).collect(),
342        };
343        // Adds the given peer IPs to the list of candidate peers.
344        self.router().insert_candidate_peers(&peers);
345        true
346    }
347
348    /// Handles a `Ping` message.
349    fn ping(&self, peer_ip: SocketAddr, message: Ping<N>) -> bool;
350
351    /// Sleeps for a period and then sends a `Ping` message to the peer.
352    fn pong(&self, peer_ip: SocketAddr, _message: Pong) -> bool;
353
354    /// Handles a `PuzzleRequest` message.
355    fn puzzle_request(&self, peer_ip: SocketAddr) -> bool;
356
357    /// Handles a `PuzzleResponse` message.
358    fn puzzle_response(&self, peer_ip: SocketAddr, _epoch_hash: N::BlockHash, _header: Header<N>) -> bool;
359
360    /// Handles an `UnconfirmedSolution` message.
361    async fn unconfirmed_solution(
362        &self,
363        peer_ip: SocketAddr,
364        serialized: UnconfirmedSolution<N>,
365        solution: Solution<N>,
366    ) -> bool;
367
368    /// Handles an `UnconfirmedTransaction` message.
369    async fn unconfirmed_transaction(
370        &self,
371        peer_ip: SocketAddr,
372        serialized: UnconfirmedTransaction<N>,
373        _transaction: Transaction<N>,
374    ) -> bool;
375}