Skip to main content

snarkos_node_router/
inbound.rs

1// Copyright (c) 2019-2026 Provable Inc.
2// This file is part of the snarkOS library.
3
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at:
7
8// http://www.apache.org/licenses/LICENSE-2.0
9
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16use crate::{
17    Outbound,
18    PeerPoolHandling,
19    messages::{
20        BlockRequest,
21        BlockResponse,
22        DataBlocks,
23        Message,
24        PeerResponse,
25        Ping,
26        Pong,
27        UnconfirmedSolution,
28        UnconfirmedTransaction,
29    },
30};
31use snarkos_node_tcp::protocols::Reading;
32use snarkvm::prelude::{
33    ConsensusVersion,
34    Network,
35    block::{Block, Header, Transaction},
36    puzzle::Solution,
37};
38
39use anyhow::{Result, anyhow, bail};
40use std::net::SocketAddr;
41use tokio::task::spawn_blocking;
42
43/// The max number of peers to send in a `PeerResponse` message.
44pub const MAX_PEERS_TO_SEND: usize = u8::MAX as usize;
45
46#[async_trait]
47pub trait Inbound<N: Network>: Reading + Outbound<N> {
48    /// The maximum number of puzzle requests per interval.
49    const MAXIMUM_PUZZLE_REQUESTS_PER_INTERVAL: usize = 5;
50    /// The maximum number of block requests per interval.
51    const MAXIMUM_BLOCK_REQUESTS_PER_INTERVAL: usize = 256;
52    /// The maximum number of unconfirmed solutions per interval (~3800 per hour at 60s interval).
53    const MAXIMUM_UNCONFIRMED_SOLUTIONS_PER_INTERVAL: usize = 64;
54    /// The duration in seconds to sleep in between ping requests with a connected peer.
55    const PING_SLEEP_IN_SECS: u64 = 20; // 20 seconds
56    /// The time frame to enforce the `MESSAGE_LIMIT`.
57    const MESSAGE_LIMIT_TIME_FRAME_IN_SECS: i64 = 5;
58    /// The maximum number of messages accepted within `MESSAGE_LIMIT_TIME_FRAME_IN_SECS`.
59    const MESSAGE_LIMIT: usize = 500;
60
61    /// Returns `true` if the message version is valid.
62    fn is_valid_message_version(&self, message_version: u32) -> bool;
63
64    /// Is the node synced enough to process unconfirmed transactions and solutions?
65    fn is_within_sync_leniency(&self) -> bool {
66        // The maximum number of blocks the client can be behind it's latest peer before it skips
67        // processing incoming transactions and solutions.
68        const SYNC_LENIENCY: u32 = 10;
69
70        if let Some(num) = self.num_blocks_behind() {
71            num <= SYNC_LENIENCY
72        } else {
73            // We have not received block locators yet.
74            true
75        }
76    }
77
78    /// Handles the inbound message from the peer. The returned value indicates whether
79    /// the connection is still active, and errors cause a disconnect once they are
80    /// propagated to the caller.
81    async fn inbound(&self, peer_addr: SocketAddr, message: Message<N>) -> Result<bool> {
82        // Retrieve the listener IP for the peer.
83        let peer_ip = match self.router().resolve_to_listener(peer_addr) {
84            Some(peer_ip) => peer_ip,
85            None => {
86                // No longer connected to the peer.
87                trace!("Dropping a {} from {peer_addr} - no longer connected.", message.name());
88                return Ok(false);
89            }
90        };
91
92        // Drop the peer, if they have sent more than `MESSAGE_LIMIT` messages
93        // in the last `MESSAGE_LIMIT_TIME_FRAME_IN_SECS` seconds.
94        let num_messages = self.router().cache.insert_inbound_message(peer_ip, Self::MESSAGE_LIMIT_TIME_FRAME_IN_SECS);
95        if num_messages > Self::MESSAGE_LIMIT {
96            bail!("Dropping '{peer_ip}' for spamming messages (num_messages = {num_messages})")
97        }
98
99        trace!("Received '{}' from '{peer_ip}'", message.name());
100
101        // Update the last seen timestamp of the peer.
102        self.router().update_last_seen_for_connected_peer(peer_ip);
103
104        // This match statement handles the inbound message by deserializing the message,
105        // checking that the message is valid, and then calling the appropriate (trait) handler.
106        match message {
107            Message::BlockRequest(message) => {
108                let BlockRequest { start_height, end_height } = &message;
109                // Insert the block request for the peer, and fetch the recent frequency.
110                let frequency = self.router().cache.insert_inbound_block_request(peer_ip);
111                // Check if the number of block requests is within the limit.
112                if frequency > Self::MAXIMUM_BLOCK_REQUESTS_PER_INTERVAL {
113                    bail!("Peer '{peer_ip}' is not following the protocol (excessive block requests)")
114                }
115                // Ensure the block request is well-formed.
116                if start_height >= end_height {
117                    bail!("Block request from '{peer_ip}' has an invalid range ({start_height}..{end_height})")
118                }
119                // Ensure that the block request is within the allowed bounds.
120                if end_height - start_height > DataBlocks::<N>::MAXIMUM_NUMBER_OF_BLOCKS as u32 {
121                    bail!("Block request from '{peer_ip}' has an excessive range ({start_height}..{end_height})")
122                }
123
124                let node = self.clone();
125                match spawn_blocking(move || node.block_request(peer_ip, message)).await? {
126                    true => Ok(true),
127                    false => bail!("Peer '{peer_ip}' sent an invalid block request"),
128                }
129            }
130            Message::BlockResponse(BlockResponse { request, latest_consensus_version, blocks, .. }) => {
131                // Remove the block request, checking if this node previously sent a block request to this peer.
132                if !self.router().cache.remove_outbound_block_request(peer_ip, &request) {
133                    bail!("Peer '{peer_ip}' is not following the protocol (unexpected block response)")
134                }
135
136                // Perform the deferred non-blocking deserialization of the blocks.
137                // The deserialization can take a long time (minutes). We should not be running
138                // this on a blocking task, but on a rayon thread pool.
139                let (send, recv) = tokio::sync::oneshot::channel();
140                rayon::spawn_fifo(move || {
141                    let blocks = blocks.deserialize_blocking().map_err(|error| anyhow!("[BlockResponse] {error}"));
142                    let _ = send.send(blocks);
143                });
144                let blocks = match recv.await {
145                    Ok(Ok(blocks)) => blocks,
146                    Ok(Err(error)) => bail!("Peer '{peer_ip}' sent an invalid block response - {error}"),
147                    Err(error) => bail!("Peer '{peer_ip}' sent an invalid block response - {error}"),
148                };
149
150                // Ensure the block response is well-formed.
151                blocks.ensure_response_is_well_formed(peer_ip, request.start_height, request.end_height)?;
152
153                // Process the block response.
154                let node = self.clone();
155                match spawn_blocking(move || node.block_response(peer_ip, blocks.0, latest_consensus_version)).await? {
156                    true => Ok(true),
157                    false => bail!("Peer '{peer_ip}' sent an invalid block response"),
158                }
159            }
160            Message::ChallengeRequest(..) | Message::ChallengeResponse(..) => {
161                // Disconnect as the peer is not following the protocol.
162                bail!("Peer '{peer_ip}' is not following the protocol")
163            }
164            Message::Disconnect(message) => {
165                // The peer informs us that they had disconnected. Disconnect from them too.
166                debug!("Peer '{peer_ip}' decided to disconnect due to '{}'", message.reason);
167                self.router().disconnect(peer_ip);
168                Ok(false)
169            }
170            Message::PeerRequest(..) => match self.peer_request(peer_ip) {
171                true => Ok(true),
172                false => bail!("Peer '{peer_ip}' sent an invalid peer request"),
173            },
174            Message::PeerResponse(message) => {
175                if !self.router().cache.contains_outbound_peer_request(peer_ip) {
176                    bail!("Peer '{peer_ip}' is not following the protocol (unexpected peer response)")
177                }
178                self.router().cache.decrement_outbound_peer_requests(peer_ip);
179                if self.router().trusted_peers_only() {
180                    bail!("Not accepting peer response from '{peer_ip}' (trusted peers only)");
181                }
182
183                match self.peer_response(peer_ip, message.peers) {
184                    true => Ok(true),
185                    false => bail!("Peer '{peer_ip}' sent an invalid peer response"),
186                }
187            }
188            Message::Ping(message) => {
189                // Ensure the message protocol version is not outdated.
190                if !self.is_valid_message_version(message.version) {
191                    bail!("Dropping '{peer_ip}' on message version {} (outdated)", message.version);
192                }
193
194                // If the peer is a client or validator, ensure there are block locators.
195                let is_client_or_validator = message.node_type.is_client() || message.node_type.is_validator();
196                if is_client_or_validator && message.block_locators.is_none() {
197                    bail!("Peer '{peer_ip}' is a {}, but no block locators were provided", message.node_type);
198                }
199                // If the peer is a prover, ensure there are no block locators.
200                else if message.node_type.is_prover() && message.block_locators.is_some() {
201                    bail!("Peer '{peer_ip}' is a prover, but block locators were provided");
202                }
203
204                // Process the ping message.
205                match self.ping(peer_ip, message) {
206                    true => Ok(true),
207                    false => bail!("Peer '{peer_ip}' sent an invalid ping"),
208                }
209            }
210            Message::Pong(message) => match self.pong(peer_ip, message) {
211                true => Ok(true),
212                false => bail!("Peer '{peer_ip}' sent an invalid pong"),
213            },
214            Message::PuzzleRequest(..) => {
215                // Insert the puzzle request for the peer, and fetch the recent frequency.
216                let frequency = self.router().cache.insert_inbound_puzzle_request(peer_ip);
217                // Check if the number of puzzle requests is within the limit.
218                if frequency > Self::MAXIMUM_PUZZLE_REQUESTS_PER_INTERVAL {
219                    bail!("Peer '{peer_ip}' is not following the protocol (excessive puzzle requests)")
220                }
221                // Process the puzzle request.
222                match self.puzzle_request(peer_ip) {
223                    true => Ok(true),
224                    false => bail!("Peer '{peer_ip}' sent an invalid puzzle request"),
225                }
226            }
227            Message::PuzzleResponse(message) => {
228                // Check that this node previously sent a puzzle request to this peer.
229                if !self.router().cache.contains_outbound_puzzle_request(&peer_ip) {
230                    bail!("Peer '{peer_ip}' is not following the protocol (unexpected puzzle response)")
231                }
232                // Decrement the number of puzzle requests.
233                self.router().cache.decrement_outbound_puzzle_requests(peer_ip);
234
235                // Perform the deferred non-blocking deserialization of the block header.
236                let header = match message.block_header.deserialize().await {
237                    Ok(header) => header,
238                    Err(error) => bail!("[PuzzleResponse] {error}"),
239                };
240                // Process the puzzle response.
241                match self.puzzle_response(peer_ip, message.epoch_hash, header) {
242                    true => Ok(true),
243                    false => bail!("Peer '{peer_ip}' sent an invalid puzzle response"),
244                }
245            }
246            Message::UnconfirmedSolution(message) => {
247                // Insert the unconfirmed solution for the peer, and fetch the recent frequency.
248                let frequency = self.router().cache.insert_inbound_unconfirmed_solution(peer_ip);
249                // Check if the number of unconfirmed solutions is within the limit.
250                if frequency > Self::MAXIMUM_UNCONFIRMED_SOLUTIONS_PER_INTERVAL {
251                    bail!("Peer '{peer_ip}' is not following the protocol (excessive unconfirmed solutions)")
252                }
253
254                // Do not process unconfirmed solutions if the node is too far behind.
255                if !self.is_within_sync_leniency() {
256                    trace!("Skipped processing unconfirmed solution '{}' (node is syncing)", message.solution_id);
257                    return Ok(true);
258                }
259
260                // Update the timestamp for the unconfirmed solution.
261                let seen_before = self.router().cache.insert_inbound_solution(peer_ip, message.solution_id).is_some();
262                // Determine whether to propagate the solution.
263                if seen_before {
264                    trace!("Skipping 'UnconfirmedSolution' from '{peer_ip}'");
265                    return Ok(true);
266                }
267                // Clone the serialized message.
268                let serialized = message.clone();
269                // Perform the deferred non-blocking deserialization of the solution.
270                let solution = match message.solution.deserialize().await {
271                    Ok(solution) => solution,
272                    Err(error) => bail!("[UnconfirmedSolution] {error}"),
273                };
274                // Check that the solution parameters match.
275                if message.solution_id != solution.id() {
276                    bail!("Peer '{peer_ip}' is not following the 'UnconfirmedSolution' protocol")
277                }
278                // Handle the unconfirmed solution.
279                match self.unconfirmed_solution(peer_ip, serialized, solution).await {
280                    true => Ok(true),
281                    false => bail!("Peer '{peer_ip}' sent an invalid unconfirmed solution"),
282                }
283            }
284            Message::UnconfirmedTransaction(message) => {
285                // Do not process unconfirmed solutions if the node is too far behind.
286                if !self.is_within_sync_leniency() {
287                    trace!("Skipped processing unconfirmed transaction '{}' (node is syncing)", message.transaction_id);
288                    return Ok(true);
289                }
290                // Update the timestamp for the unconfirmed transaction.
291                let seen_before =
292                    self.router().cache.insert_inbound_transaction(peer_ip, message.transaction_id).is_some();
293                // Determine whether to propagate the transaction.
294                if seen_before {
295                    trace!("Skipping 'UnconfirmedTransaction' from '{peer_ip}'");
296                    return Ok(true);
297                }
298                // Clone the serialized message.
299                let serialized = message.clone();
300                // Perform the deferred non-blocking deserialization of the transaction.
301                let transaction = match message.transaction.deserialize().await {
302                    Ok(transaction) => transaction,
303                    Err(error) => bail!("[UnconfirmedTransaction] {error}"),
304                };
305                // Check that the transaction parameters match.
306                if message.transaction_id != transaction.id() {
307                    bail!("Peer '{peer_ip}' is not following the 'UnconfirmedTransaction' protocol")
308                }
309                // Handle the unconfirmed transaction.
310                match self.unconfirmed_transaction(peer_ip, serialized, transaction).await {
311                    true => Ok(true),
312                    false => bail!("Peer '{peer_ip}' sent an invalid unconfirmed transaction"),
313                }
314            }
315        }
316    }
317
318    /// Handles a `BlockRequest` message.
319    fn block_request(&self, peer_ip: SocketAddr, _message: BlockRequest) -> bool;
320
321    /// Handles a `BlockResponse` message.
322    fn block_response(
323        &self,
324        peer_ip: SocketAddr,
325        blocks: Vec<Block<N>>,
326        latest_consensus_version: Option<ConsensusVersion>,
327    ) -> bool;
328
329    /// Handles a `PeerRequest` message.
330    fn peer_request(&self, peer_ip: SocketAddr) -> bool {
331        let peers = self.router().get_best_connected_peers(Some(MAX_PEERS_TO_SEND));
332        let peers = peers.into_iter().map(|peer| (peer.listener_addr, peer.last_height_seen)).collect();
333
334        // Send a `PeerResponse` message to the peer.
335        self.router().send(peer_ip, Message::PeerResponse(PeerResponse { peers }));
336        true
337    }
338
339    /// Handles a `PeerResponse` message.
340    fn peer_response(&self, _peer_ip: SocketAddr, peers: Vec<(SocketAddr, Option<u32>)>) -> bool {
341        // Check if the number of peers received is less than MAX_PEERS_TO_SEND.
342        if peers.len() > MAX_PEERS_TO_SEND {
343            return false;
344        }
345        // Adds the given peer IPs to the list of candidate peers.
346        if !peers.is_empty() {
347            self.router().insert_candidate_peers(peers);
348        }
349
350        #[cfg(feature = "metrics")]
351        self.router().update_metrics();
352
353        true
354    }
355
356    /// Handles a `Ping` message.
357    fn ping(&self, peer_ip: SocketAddr, message: Ping<N>) -> bool;
358
359    /// Sleeps for a period and then sends a `Ping` message to the peer.
360    fn pong(&self, peer_ip: SocketAddr, _message: Pong) -> bool;
361
362    /// Handles a `PuzzleRequest` message.
363    fn puzzle_request(&self, peer_ip: SocketAddr) -> bool;
364
365    /// Handles a `PuzzleResponse` message.
366    fn puzzle_response(&self, peer_ip: SocketAddr, _epoch_hash: N::BlockHash, _header: Header<N>) -> bool;
367
368    /// Handles an `UnconfirmedSolution` message.
369    async fn unconfirmed_solution(
370        &self,
371        peer_ip: SocketAddr,
372        serialized: UnconfirmedSolution<N>,
373        solution: Solution<N>,
374    ) -> bool;
375
376    /// Handles an `UnconfirmedTransaction` message.
377    async fn unconfirmed_transaction(
378        &self,
379        peer_ip: SocketAddr,
380        serialized: UnconfirmedTransaction<N>,
381        _transaction: Transaction<N>,
382    ) -> bool;
383}