snarkos_node_router/
inbound.rs

1// Copyright (c) 2019-2025 Provable Inc.
2// This file is part of the snarkOS library.
3
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at:
7
8// http://www.apache.org/licenses/LICENSE-2.0
9
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16use crate::{
17    Outbound,
18    Peer,
19    messages::{
20        BlockRequest,
21        BlockResponse,
22        DataBlocks,
23        Message,
24        PeerResponse,
25        Ping,
26        Pong,
27        UnconfirmedSolution,
28        UnconfirmedTransaction,
29    },
30};
31use snarkos_node_tcp::protocols::Reading;
32use snarkvm::prelude::{
33    Network,
34    block::{Block, Header, Transaction},
35    puzzle::Solution,
36};
37
38use anyhow::{Result, anyhow, bail};
39use snarkos_node_tcp::is_bogon_ip;
40use std::net::SocketAddr;
41use tokio::task::spawn_blocking;
42
43/// The max number of peers to send in a `PeerResponse` message.
44const MAX_PEERS_TO_SEND: usize = u8::MAX as usize;
45
46#[async_trait]
47pub trait Inbound<N: Network>: Reading + Outbound<N> {
48    /// The maximum number of puzzle requests per interval.
49    const MAXIMUM_PUZZLE_REQUESTS_PER_INTERVAL: usize = 5;
50    /// The maximum number of block requests per interval.
51    const MAXIMUM_BLOCK_REQUESTS_PER_INTERVAL: usize = 256;
52    /// The duration in seconds to sleep in between ping requests with a connected peer.
53    const PING_SLEEP_IN_SECS: u64 = 20; // 20 seconds
54    /// The time frame to enforce the `MESSAGE_LIMIT`.
55    const MESSAGE_LIMIT_TIME_FRAME_IN_SECS: i64 = 5;
56    /// The maximum number of messages accepted within `MESSAGE_LIMIT_TIME_FRAME_IN_SECS`.
57    const MESSAGE_LIMIT: usize = 500;
58
59    /// Returns `true` if the message version is valid.
60    fn is_valid_message_version(&self, message_version: u32) -> bool;
61
62    /// Is the node synced enough to process unconfirmed transactions and solutions?
63    fn is_within_sync_leniency(&self) -> bool {
64        // The maximum number of blocks the client can be behind it's latest peer before it skips
65        // processing incoming transactions and solutions.
66        const SYNC_LENIENCY: u32 = 10;
67
68        if let Some(num) = self.num_blocks_behind() {
69            num <= SYNC_LENIENCY
70        } else {
71            // We have not received block locators yet.
72            true
73        }
74    }
75
76    /// Handles the inbound message from the peer.
77    async fn inbound(&self, peer_addr: SocketAddr, message: Message<N>) -> Result<()> {
78        // Retrieve the listener IP for the peer.
79        let peer_ip = match self.router().resolve_to_listener(&peer_addr) {
80            Some(peer_ip) => peer_ip,
81            None => bail!("Unable to resolve the (ambiguous) peer address '{peer_addr}'"),
82        };
83
84        // Drop the peer, if they have sent more than `MESSAGE_LIMIT` messages
85        // in the last `MESSAGE_LIMIT_TIME_FRAME_IN_SECS` seconds.
86        let num_messages = self.router().cache.insert_inbound_message(peer_ip, Self::MESSAGE_LIMIT_TIME_FRAME_IN_SECS);
87        if num_messages > Self::MESSAGE_LIMIT {
88            bail!("Dropping '{peer_ip}' for spamming messages (num_messages = {num_messages})")
89        }
90
91        trace!("Received '{}' from '{peer_ip}'", message.name());
92
93        // Update the last seen timestamp of the peer.
94        self.router().update_last_seen_for_connected_peer(peer_ip);
95
96        // This match statement handles the inbound message by deserializing the message,
97        // checking that the message is valid, and then calling the appropriate (trait) handler.
98        match message {
99            Message::BlockRequest(message) => {
100                let BlockRequest { start_height, end_height } = &message;
101                // Insert the block request for the peer, and fetch the recent frequency.
102                let frequency = self.router().cache.insert_inbound_block_request(peer_ip);
103                // Check if the number of block requests is within the limit.
104                if frequency > Self::MAXIMUM_BLOCK_REQUESTS_PER_INTERVAL {
105                    bail!("Peer '{peer_ip}' is not following the protocol (excessive block requests)")
106                }
107                // Ensure the block request is well-formed.
108                if start_height >= end_height {
109                    bail!("Block request from '{peer_ip}' has an invalid range ({start_height}..{end_height})")
110                }
111                // Ensure that the block request is within the allowed bounds.
112                if end_height - start_height > DataBlocks::<N>::MAXIMUM_NUMBER_OF_BLOCKS as u32 {
113                    bail!("Block request from '{peer_ip}' has an excessive range ({start_height}..{end_height})")
114                }
115
116                let node = self.clone();
117                match spawn_blocking(move || node.block_request(peer_ip, message)).await? {
118                    true => Ok(()),
119                    false => bail!("Peer '{peer_ip}' sent an invalid block request"),
120                }
121            }
122            Message::BlockResponse(message) => {
123                let BlockResponse { request, blocks } = message;
124
125                // Remove the block request, checking if this node previously sent a block request to this peer.
126                if !self.router().cache.remove_outbound_block_request(peer_ip, &request) {
127                    bail!("Peer '{peer_ip}' is not following the protocol (unexpected block response)")
128                }
129                // Perform the deferred non-blocking deserialization of the blocks.
130                // The deserialization can take a long time (minutes). We should not be running
131                // this on a blocking task, but on a rayon thread pool.
132                let (send, recv) = tokio::sync::oneshot::channel();
133                rayon::spawn_fifo(move || {
134                    let blocks = blocks.deserialize_blocking().map_err(|error| anyhow!("[BlockResponse] {error}"));
135                    let _ = send.send(blocks);
136                });
137                let blocks = match recv.await {
138                    Ok(Ok(blocks)) => blocks,
139                    Ok(Err(error)) => bail!("Peer '{peer_ip}' sent an invalid block response - {error}"),
140                    Err(error) => bail!("Peer '{peer_ip}' sent an invalid block response - {error}"),
141                };
142
143                // Ensure the block response is well-formed.
144                blocks.ensure_response_is_well_formed(peer_ip, request.start_height, request.end_height)?;
145
146                // Process the block response.
147                let node = self.clone();
148                match spawn_blocking(move || node.block_response(peer_ip, blocks.0)).await? {
149                    true => Ok(()),
150                    false => bail!("Peer '{peer_ip}' sent an invalid block response"),
151                }
152            }
153            Message::ChallengeRequest(..) | Message::ChallengeResponse(..) => {
154                // Disconnect as the peer is not following the protocol.
155                bail!("Peer '{peer_ip}' is not following the protocol")
156            }
157            Message::Disconnect(message) => {
158                bail!("{:?}", message.reason)
159            }
160            Message::PeerRequest(..) => match self.peer_request(peer_ip) {
161                true => Ok(()),
162                false => bail!("Peer '{peer_ip}' sent an invalid peer request"),
163            },
164            Message::PeerResponse(message) => {
165                if !self.router().cache.contains_outbound_peer_request(peer_ip) {
166                    bail!("Peer '{peer_ip}' is not following the protocol (unexpected peer response)")
167                }
168                self.router().cache.decrement_outbound_peer_requests(peer_ip);
169                if !self.router().allow_external_peers() {
170                    bail!("Not accepting peer response from '{peer_ip}' (validator gossip is disabled)");
171                }
172
173                match self.peer_response(peer_ip, &message.peers) {
174                    true => Ok(()),
175                    false => bail!("Peer '{peer_ip}' sent an invalid peer response"),
176                }
177            }
178            Message::Ping(message) => {
179                // Ensure the message protocol version is not outdated.
180                if !self.is_valid_message_version(message.version) {
181                    bail!("Dropping '{peer_ip}' on message version {} (outdated)", message.version);
182                }
183
184                // If the peer is a client or validator, ensure there are block locators.
185                let is_client_or_validator = message.node_type.is_client() || message.node_type.is_validator();
186                if is_client_or_validator && message.block_locators.is_none() {
187                    bail!("Peer '{peer_ip}' is a {}, but no block locators were provided", message.node_type);
188                }
189                // If the peer is a prover, ensure there are no block locators.
190                else if message.node_type.is_prover() && message.block_locators.is_some() {
191                    bail!("Peer '{peer_ip}' is a prover or client, but block locators were provided");
192                }
193
194                // Update the connected peer.
195                if let Err(error) =
196                    self.router().update_connected_peer(peer_ip, message.node_type, |peer: &mut Peer<N>| {
197                        // Update the version of the peer.
198                        peer.set_version(message.version);
199                        // Update the node type of the peer.
200                        peer.set_node_type(message.node_type);
201                    })
202                {
203                    bail!("[Ping] {error}");
204                }
205
206                // Process the ping message.
207                match self.ping(peer_ip, message) {
208                    true => Ok(()),
209                    false => bail!("Peer '{peer_ip}' sent an invalid ping"),
210                }
211            }
212            Message::Pong(message) => match self.pong(peer_ip, message) {
213                true => Ok(()),
214                false => bail!("Peer '{peer_ip}' sent an invalid pong"),
215            },
216            Message::PuzzleRequest(..) => {
217                // Insert the puzzle request for the peer, and fetch the recent frequency.
218                let frequency = self.router().cache.insert_inbound_puzzle_request(peer_ip);
219                // Check if the number of puzzle requests is within the limit.
220                if frequency > Self::MAXIMUM_PUZZLE_REQUESTS_PER_INTERVAL {
221                    bail!("Peer '{peer_ip}' is not following the protocol (excessive puzzle requests)")
222                }
223                // Process the puzzle request.
224                match self.puzzle_request(peer_ip) {
225                    true => Ok(()),
226                    false => bail!("Peer '{peer_ip}' sent an invalid puzzle request"),
227                }
228            }
229            Message::PuzzleResponse(message) => {
230                // Check that this node previously sent a puzzle request to this peer.
231                if !self.router().cache.contains_outbound_puzzle_request(&peer_ip) {
232                    bail!("Peer '{peer_ip}' is not following the protocol (unexpected puzzle response)")
233                }
234                // Decrement the number of puzzle requests.
235                self.router().cache.decrement_outbound_puzzle_requests(peer_ip);
236
237                // Perform the deferred non-blocking deserialization of the block header.
238                let header = match message.block_header.deserialize().await {
239                    Ok(header) => header,
240                    Err(error) => bail!("[PuzzleResponse] {error}"),
241                };
242                // Process the puzzle response.
243                match self.puzzle_response(peer_ip, message.epoch_hash, header) {
244                    true => Ok(()),
245                    false => bail!("Peer '{peer_ip}' sent an invalid puzzle response"),
246                }
247            }
248            Message::UnconfirmedSolution(message) => {
249                // Do not process unconfirmed solutions if the node is too far behind.
250                if !self.is_within_sync_leniency() {
251                    trace!("Skipped processing unconfirmed solution '{}' (node is syncing)", message.solution_id);
252                    return Ok(());
253                }
254
255                // Update the timestamp for the unconfirmed solution.
256                let seen_before = self.router().cache.insert_inbound_solution(peer_ip, message.solution_id).is_some();
257                // Determine whether to propagate the solution.
258                if seen_before {
259                    trace!("Skipping 'UnconfirmedSolution' from '{peer_ip}'");
260                    return Ok(());
261                }
262                // Clone the serialized message.
263                let serialized = message.clone();
264                // Perform the deferred non-blocking deserialization of the solution.
265                let solution = match message.solution.deserialize().await {
266                    Ok(solution) => solution,
267                    Err(error) => bail!("[UnconfirmedSolution] {error}"),
268                };
269                // Check that the solution parameters match.
270                if message.solution_id != solution.id() {
271                    bail!("Peer '{peer_ip}' is not following the 'UnconfirmedSolution' protocol")
272                }
273                // Handle the unconfirmed solution.
274                match self.unconfirmed_solution(peer_ip, serialized, solution).await {
275                    true => Ok(()),
276                    false => bail!("Peer '{peer_ip}' sent an invalid unconfirmed solution"),
277                }
278            }
279            Message::UnconfirmedTransaction(message) => {
280                // Do not process unconfirmed solutions if the node is too far behind.
281                if !self.is_within_sync_leniency() {
282                    trace!("Skipped processing unconfirmed transaction '{}' (node is syncing)", message.transaction_id);
283                    return Ok(());
284                }
285                // Update the timestamp for the unconfirmed transaction.
286                let seen_before =
287                    self.router().cache.insert_inbound_transaction(peer_ip, message.transaction_id).is_some();
288                // Determine whether to propagate the transaction.
289                if seen_before {
290                    trace!("Skipping 'UnconfirmedTransaction' from '{peer_ip}'");
291                    return Ok(());
292                }
293                // Clone the serialized message.
294                let serialized = message.clone();
295                // Perform the deferred non-blocking deserialization of the transaction.
296                let transaction = match message.transaction.deserialize().await {
297                    Ok(transaction) => transaction,
298                    Err(error) => bail!("[UnconfirmedTransaction] {error}"),
299                };
300                // Check that the transaction parameters match.
301                if message.transaction_id != transaction.id() {
302                    bail!("Peer '{peer_ip}' is not following the 'UnconfirmedTransaction' protocol")
303                }
304                // Handle the unconfirmed transaction.
305                match self.unconfirmed_transaction(peer_ip, serialized, transaction).await {
306                    true => Ok(()),
307                    false => bail!("Peer '{peer_ip}' sent an invalid unconfirmed transaction"),
308                }
309            }
310        }
311    }
312
313    /// Handles a `BlockRequest` message.
314    fn block_request(&self, peer_ip: SocketAddr, _message: BlockRequest) -> bool;
315
316    /// Handles a `BlockResponse` message.
317    fn block_response(&self, peer_ip: SocketAddr, _blocks: Vec<Block<N>>) -> bool;
318
319    /// Handles a `PeerRequest` message.
320    fn peer_request(&self, peer_ip: SocketAddr) -> bool {
321        // Retrieve the connected peers.
322        let peers = self.router().connected_peers();
323        // Filter out invalid addresses.
324        let peers = match self.router().is_dev() {
325            // In development mode, relax the validity requirements to make operating devnets more flexible.
326            true => {
327                peers.into_iter().filter(|ip| *ip != peer_ip && !is_bogon_ip(ip.ip())).take(MAX_PEERS_TO_SEND).collect()
328            }
329            // In production mode, ensure the peer IPs are valid.
330            false => peers
331                .into_iter()
332                .filter(|ip| *ip != peer_ip && self.router().is_valid_peer_ip(ip))
333                .take(MAX_PEERS_TO_SEND)
334                .collect(),
335        };
336        // Send a `PeerResponse` message to the peer.
337        self.router().send(peer_ip, Message::PeerResponse(PeerResponse { peers }));
338        true
339    }
340
341    /// Handles a `PeerResponse` message.
342    fn peer_response(&self, _peer_ip: SocketAddr, peers: &[SocketAddr]) -> bool {
343        // Check if the number of peers received is less than MAX_PEERS_TO_SEND.
344        if peers.len() > MAX_PEERS_TO_SEND {
345            return false;
346        }
347        // Filter out invalid addresses.
348        let peers = match self.router().is_dev() {
349            // In development mode, relax the validity requirements to make operating devnets more flexible.
350            true => peers.iter().copied().filter(|ip| !is_bogon_ip(ip.ip())).collect::<Vec<_>>(),
351            // In production mode, ensure the peer IPs are valid.
352            false => peers.iter().copied().filter(|ip| self.router().is_valid_peer_ip(ip)).collect(),
353        };
354        // Adds the given peer IPs to the list of candidate peers.
355        self.router().insert_candidate_peers(&peers);
356        true
357    }
358
359    /// Handles a `Ping` message.
360    fn ping(&self, peer_ip: SocketAddr, message: Ping<N>) -> bool;
361
362    /// Sleeps for a period and then sends a `Ping` message to the peer.
363    fn pong(&self, peer_ip: SocketAddr, _message: Pong) -> bool;
364
365    /// Handles a `PuzzleRequest` message.
366    fn puzzle_request(&self, peer_ip: SocketAddr) -> bool;
367
368    /// Handles a `PuzzleResponse` message.
369    fn puzzle_response(&self, peer_ip: SocketAddr, _epoch_hash: N::BlockHash, _header: Header<N>) -> bool;
370
371    /// Handles an `UnconfirmedSolution` message.
372    async fn unconfirmed_solution(
373        &self,
374        peer_ip: SocketAddr,
375        serialized: UnconfirmedSolution<N>,
376        solution: Solution<N>,
377    ) -> bool;
378
379    /// Handles an `UnconfirmedTransaction` message.
380    async fn unconfirmed_transaction(
381        &self,
382        peer_ip: SocketAddr,
383        serialized: UnconfirmedTransaction<N>,
384        _transaction: Transaction<N>,
385    ) -> bool;
386}