snarkos_node_router/
inbound.rs

1// Copyright (c) 2019-2025 Provable Inc.
2// This file is part of the snarkOS library.
3
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at:
7
8// http://www.apache.org/licenses/LICENSE-2.0
9
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16use crate::{
17    Outbound,
18    PeerPoolHandling,
19    messages::{
20        BlockRequest,
21        BlockResponse,
22        DataBlocks,
23        Message,
24        PeerResponse,
25        Ping,
26        Pong,
27        UnconfirmedSolution,
28        UnconfirmedTransaction,
29    },
30};
31use snarkos_node_tcp::protocols::Reading;
32use snarkvm::prelude::{
33    ConsensusVersion,
34    Network,
35    block::{Block, Header, Transaction},
36    puzzle::Solution,
37};
38
39use anyhow::{Result, anyhow, bail};
40use std::net::SocketAddr;
41use tokio::task::spawn_blocking;
42
43/// The max number of peers to send in a `PeerResponse` message.
44pub const MAX_PEERS_TO_SEND: usize = u8::MAX as usize;
45
46#[async_trait]
47pub trait Inbound<N: Network>: Reading + Outbound<N> {
48    /// The maximum number of puzzle requests per interval.
49    const MAXIMUM_PUZZLE_REQUESTS_PER_INTERVAL: usize = 5;
50    /// The maximum number of block requests per interval.
51    const MAXIMUM_BLOCK_REQUESTS_PER_INTERVAL: usize = 256;
52    /// The duration in seconds to sleep in between ping requests with a connected peer.
53    const PING_SLEEP_IN_SECS: u64 = 20; // 20 seconds
54    /// The time frame to enforce the `MESSAGE_LIMIT`.
55    const MESSAGE_LIMIT_TIME_FRAME_IN_SECS: i64 = 5;
56    /// The maximum number of messages accepted within `MESSAGE_LIMIT_TIME_FRAME_IN_SECS`.
57    const MESSAGE_LIMIT: usize = 500;
58
59    /// Returns `true` if the message version is valid.
60    fn is_valid_message_version(&self, message_version: u32) -> bool;
61
62    /// Is the node synced enough to process unconfirmed transactions and solutions?
63    fn is_within_sync_leniency(&self) -> bool {
64        // The maximum number of blocks the client can be behind it's latest peer before it skips
65        // processing incoming transactions and solutions.
66        const SYNC_LENIENCY: u32 = 10;
67
68        if let Some(num) = self.num_blocks_behind() {
69            num <= SYNC_LENIENCY
70        } else {
71            // We have not received block locators yet.
72            true
73        }
74    }
75
76    /// Handles the inbound message from the peer. The returned value indicates whether
77    /// the connection is still active, and errors cause a disconnect once they are
78    /// propagated to the caller.
79    async fn inbound(&self, peer_addr: SocketAddr, message: Message<N>) -> Result<bool> {
80        // Retrieve the listener IP for the peer.
81        let peer_ip = match self.router().resolve_to_listener(peer_addr) {
82            Some(peer_ip) => peer_ip,
83            None => {
84                // No longer connected to the peer.
85                trace!("Dropping a {} from {peer_addr} - no longer connected.", message.name());
86                return Ok(false);
87            }
88        };
89
90        // Drop the peer, if they have sent more than `MESSAGE_LIMIT` messages
91        // in the last `MESSAGE_LIMIT_TIME_FRAME_IN_SECS` seconds.
92        let num_messages = self.router().cache.insert_inbound_message(peer_ip, Self::MESSAGE_LIMIT_TIME_FRAME_IN_SECS);
93        if num_messages > Self::MESSAGE_LIMIT {
94            bail!("Dropping '{peer_ip}' for spamming messages (num_messages = {num_messages})")
95        }
96
97        trace!("Received '{}' from '{peer_ip}'", message.name());
98
99        // Update the last seen timestamp of the peer.
100        self.router().update_last_seen_for_connected_peer(peer_ip);
101
102        // This match statement handles the inbound message by deserializing the message,
103        // checking that the message is valid, and then calling the appropriate (trait) handler.
104        match message {
105            Message::BlockRequest(message) => {
106                let BlockRequest { start_height, end_height } = &message;
107                // Insert the block request for the peer, and fetch the recent frequency.
108                let frequency = self.router().cache.insert_inbound_block_request(peer_ip);
109                // Check if the number of block requests is within the limit.
110                if frequency > Self::MAXIMUM_BLOCK_REQUESTS_PER_INTERVAL {
111                    bail!("Peer '{peer_ip}' is not following the protocol (excessive block requests)")
112                }
113                // Ensure the block request is well-formed.
114                if start_height >= end_height {
115                    bail!("Block request from '{peer_ip}' has an invalid range ({start_height}..{end_height})")
116                }
117                // Ensure that the block request is within the allowed bounds.
118                if end_height - start_height > DataBlocks::<N>::MAXIMUM_NUMBER_OF_BLOCKS as u32 {
119                    bail!("Block request from '{peer_ip}' has an excessive range ({start_height}..{end_height})")
120                }
121
122                let node = self.clone();
123                match spawn_blocking(move || node.block_request(peer_ip, message)).await? {
124                    true => Ok(true),
125                    false => bail!("Peer '{peer_ip}' sent an invalid block request"),
126                }
127            }
128            Message::BlockResponse(BlockResponse { request, latest_consensus_version, blocks, .. }) => {
129                // Remove the block request, checking if this node previously sent a block request to this peer.
130                if !self.router().cache.remove_outbound_block_request(peer_ip, &request) {
131                    bail!("Peer '{peer_ip}' is not following the protocol (unexpected block response)")
132                }
133
134                // Perform the deferred non-blocking deserialization of the blocks.
135                // The deserialization can take a long time (minutes). We should not be running
136                // this on a blocking task, but on a rayon thread pool.
137                let (send, recv) = tokio::sync::oneshot::channel();
138                rayon::spawn_fifo(move || {
139                    let blocks = blocks.deserialize_blocking().map_err(|error| anyhow!("[BlockResponse] {error}"));
140                    let _ = send.send(blocks);
141                });
142                let blocks = match recv.await {
143                    Ok(Ok(blocks)) => blocks,
144                    Ok(Err(error)) => bail!("Peer '{peer_ip}' sent an invalid block response - {error}"),
145                    Err(error) => bail!("Peer '{peer_ip}' sent an invalid block response - {error}"),
146                };
147
148                // Ensure the block response is well-formed.
149                blocks.ensure_response_is_well_formed(peer_ip, request.start_height, request.end_height)?;
150
151                // Process the block response.
152                let node = self.clone();
153                match spawn_blocking(move || node.block_response(peer_ip, blocks.0, latest_consensus_version)).await? {
154                    true => Ok(true),
155                    false => bail!("Peer '{peer_ip}' sent an invalid block response"),
156                }
157            }
158            Message::ChallengeRequest(..) | Message::ChallengeResponse(..) => {
159                // Disconnect as the peer is not following the protocol.
160                bail!("Peer '{peer_ip}' is not following the protocol")
161            }
162            Message::Disconnect(message) => {
163                // The peer informs us that they had disconnected. Disconnect from them too.
164                debug!("Peer '{peer_ip}' decided to disconnect due to '{:?}'", message.reason);
165                self.router().disconnect(peer_ip);
166                Ok(false)
167            }
168            Message::PeerRequest(..) => match self.peer_request(peer_ip) {
169                true => Ok(true),
170                false => bail!("Peer '{peer_ip}' sent an invalid peer request"),
171            },
172            Message::PeerResponse(message) => {
173                if !self.router().cache.contains_outbound_peer_request(peer_ip) {
174                    bail!("Peer '{peer_ip}' is not following the protocol (unexpected peer response)")
175                }
176                self.router().cache.decrement_outbound_peer_requests(peer_ip);
177                if self.router().trusted_peers_only() {
178                    bail!("Not accepting peer response from '{peer_ip}' (trusted peers only)");
179                }
180
181                match self.peer_response(peer_ip, message.peers) {
182                    true => Ok(true),
183                    false => bail!("Peer '{peer_ip}' sent an invalid peer response"),
184                }
185            }
186            Message::Ping(message) => {
187                // Ensure the message protocol version is not outdated.
188                if !self.is_valid_message_version(message.version) {
189                    bail!("Dropping '{peer_ip}' on message version {} (outdated)", message.version);
190                }
191
192                // If the peer is a client or validator, ensure there are block locators.
193                let is_client_or_validator = message.node_type.is_client() || message.node_type.is_validator();
194                if is_client_or_validator && message.block_locators.is_none() {
195                    bail!("Peer '{peer_ip}' is a {}, but no block locators were provided", message.node_type);
196                }
197                // If the peer is a prover, ensure there are no block locators.
198                else if message.node_type.is_prover() && message.block_locators.is_some() {
199                    bail!("Peer '{peer_ip}' is a prover, but block locators were provided");
200                }
201
202                // Process the ping message.
203                match self.ping(peer_ip, message) {
204                    true => Ok(true),
205                    false => bail!("Peer '{peer_ip}' sent an invalid ping"),
206                }
207            }
208            Message::Pong(message) => match self.pong(peer_ip, message) {
209                true => Ok(true),
210                false => bail!("Peer '{peer_ip}' sent an invalid pong"),
211            },
212            Message::PuzzleRequest(..) => {
213                // Insert the puzzle request for the peer, and fetch the recent frequency.
214                let frequency = self.router().cache.insert_inbound_puzzle_request(peer_ip);
215                // Check if the number of puzzle requests is within the limit.
216                if frequency > Self::MAXIMUM_PUZZLE_REQUESTS_PER_INTERVAL {
217                    bail!("Peer '{peer_ip}' is not following the protocol (excessive puzzle requests)")
218                }
219                // Process the puzzle request.
220                match self.puzzle_request(peer_ip) {
221                    true => Ok(true),
222                    false => bail!("Peer '{peer_ip}' sent an invalid puzzle request"),
223                }
224            }
225            Message::PuzzleResponse(message) => {
226                // Check that this node previously sent a puzzle request to this peer.
227                if !self.router().cache.contains_outbound_puzzle_request(&peer_ip) {
228                    bail!("Peer '{peer_ip}' is not following the protocol (unexpected puzzle response)")
229                }
230                // Decrement the number of puzzle requests.
231                self.router().cache.decrement_outbound_puzzle_requests(peer_ip);
232
233                // Perform the deferred non-blocking deserialization of the block header.
234                let header = match message.block_header.deserialize().await {
235                    Ok(header) => header,
236                    Err(error) => bail!("[PuzzleResponse] {error}"),
237                };
238                // Process the puzzle response.
239                match self.puzzle_response(peer_ip, message.epoch_hash, header) {
240                    true => Ok(true),
241                    false => bail!("Peer '{peer_ip}' sent an invalid puzzle response"),
242                }
243            }
244            Message::UnconfirmedSolution(message) => {
245                // Do not process unconfirmed solutions if the node is too far behind.
246                if !self.is_within_sync_leniency() {
247                    trace!("Skipped processing unconfirmed solution '{}' (node is syncing)", message.solution_id);
248                    return Ok(true);
249                }
250
251                // Update the timestamp for the unconfirmed solution.
252                let seen_before = self.router().cache.insert_inbound_solution(peer_ip, message.solution_id).is_some();
253                // Determine whether to propagate the solution.
254                if seen_before {
255                    trace!("Skipping 'UnconfirmedSolution' from '{peer_ip}'");
256                    return Ok(true);
257                }
258                // Clone the serialized message.
259                let serialized = message.clone();
260                // Perform the deferred non-blocking deserialization of the solution.
261                let solution = match message.solution.deserialize().await {
262                    Ok(solution) => solution,
263                    Err(error) => bail!("[UnconfirmedSolution] {error}"),
264                };
265                // Check that the solution parameters match.
266                if message.solution_id != solution.id() {
267                    bail!("Peer '{peer_ip}' is not following the 'UnconfirmedSolution' protocol")
268                }
269                // Handle the unconfirmed solution.
270                match self.unconfirmed_solution(peer_ip, serialized, solution).await {
271                    true => Ok(true),
272                    false => bail!("Peer '{peer_ip}' sent an invalid unconfirmed solution"),
273                }
274            }
275            Message::UnconfirmedTransaction(message) => {
276                // Do not process unconfirmed solutions if the node is too far behind.
277                if !self.is_within_sync_leniency() {
278                    trace!("Skipped processing unconfirmed transaction '{}' (node is syncing)", message.transaction_id);
279                    return Ok(true);
280                }
281                // Update the timestamp for the unconfirmed transaction.
282                let seen_before =
283                    self.router().cache.insert_inbound_transaction(peer_ip, message.transaction_id).is_some();
284                // Determine whether to propagate the transaction.
285                if seen_before {
286                    trace!("Skipping 'UnconfirmedTransaction' from '{peer_ip}'");
287                    return Ok(true);
288                }
289                // Clone the serialized message.
290                let serialized = message.clone();
291                // Perform the deferred non-blocking deserialization of the transaction.
292                let transaction = match message.transaction.deserialize().await {
293                    Ok(transaction) => transaction,
294                    Err(error) => bail!("[UnconfirmedTransaction] {error}"),
295                };
296                // Check that the transaction parameters match.
297                if message.transaction_id != transaction.id() {
298                    bail!("Peer '{peer_ip}' is not following the 'UnconfirmedTransaction' protocol")
299                }
300                // Handle the unconfirmed transaction.
301                match self.unconfirmed_transaction(peer_ip, serialized, transaction).await {
302                    true => Ok(true),
303                    false => bail!("Peer '{peer_ip}' sent an invalid unconfirmed transaction"),
304                }
305            }
306        }
307    }
308
309    /// Handles a `BlockRequest` message.
310    fn block_request(&self, peer_ip: SocketAddr, _message: BlockRequest) -> bool;
311
312    /// Handles a `BlockResponse` message.
313    fn block_response(
314        &self,
315        peer_ip: SocketAddr,
316        blocks: Vec<Block<N>>,
317        latest_consensus_version: Option<ConsensusVersion>,
318    ) -> bool;
319
320    /// Handles a `PeerRequest` message.
321    fn peer_request(&self, peer_ip: SocketAddr) -> bool {
322        let peers = self.router().get_best_connected_peers(Some(MAX_PEERS_TO_SEND));
323        let peers = peers.into_iter().map(|peer| (peer.listener_addr, peer.last_height_seen)).collect();
324
325        // Send a `PeerResponse` message to the peer.
326        self.router().send(peer_ip, Message::PeerResponse(PeerResponse { peers }));
327        true
328    }
329
330    /// Handles a `PeerResponse` message.
331    fn peer_response(&self, _peer_ip: SocketAddr, peers: Vec<(SocketAddr, Option<u32>)>) -> bool {
332        // Check if the number of peers received is less than MAX_PEERS_TO_SEND.
333        if peers.len() > MAX_PEERS_TO_SEND {
334            return false;
335        }
336        // Adds the given peer IPs to the list of candidate peers.
337        if !peers.is_empty() {
338            self.router().insert_candidate_peers(peers);
339        }
340
341        #[cfg(feature = "metrics")]
342        self.router().update_metrics();
343
344        true
345    }
346
347    /// Handles a `Ping` message.
348    fn ping(&self, peer_ip: SocketAddr, message: Ping<N>) -> bool;
349
350    /// Sleeps for a period and then sends a `Ping` message to the peer.
351    fn pong(&self, peer_ip: SocketAddr, _message: Pong) -> bool;
352
353    /// Handles a `PuzzleRequest` message.
354    fn puzzle_request(&self, peer_ip: SocketAddr) -> bool;
355
356    /// Handles a `PuzzleResponse` message.
357    fn puzzle_response(&self, peer_ip: SocketAddr, _epoch_hash: N::BlockHash, _header: Header<N>) -> bool;
358
359    /// Handles an `UnconfirmedSolution` message.
360    async fn unconfirmed_solution(
361        &self,
362        peer_ip: SocketAddr,
363        serialized: UnconfirmedSolution<N>,
364        solution: Solution<N>,
365    ) -> bool;
366
367    /// Handles an `UnconfirmedTransaction` message.
368    async fn unconfirmed_transaction(
369        &self,
370        peer_ip: SocketAddr,
371        serialized: UnconfirmedTransaction<N>,
372        _transaction: Transaction<N>,
373    ) -> bool;
374}