1use crate::{
17 Outbound,
18 PeerPoolHandling,
19 messages::{
20 BlockRequest,
21 BlockResponse,
22 DataBlocks,
23 Message,
24 PeerResponse,
25 Ping,
26 Pong,
27 UnconfirmedSolution,
28 UnconfirmedTransaction,
29 },
30};
31use snarkos_node_tcp::protocols::Reading;
32use snarkvm::prelude::{
33 ConsensusVersion,
34 Network,
35 block::{Block, Header, Transaction},
36 puzzle::Solution,
37};
38
39use anyhow::{Result, anyhow, bail};
40use std::net::SocketAddr;
41use tokio::task::spawn_blocking;
42
43pub const MAX_PEERS_TO_SEND: usize = u8::MAX as usize;
45
46#[async_trait]
47pub trait Inbound<N: Network>: Reading + Outbound<N> {
48 const MAXIMUM_PUZZLE_REQUESTS_PER_INTERVAL: usize = 5;
50 const MAXIMUM_BLOCK_REQUESTS_PER_INTERVAL: usize = 256;
52 const PING_SLEEP_IN_SECS: u64 = 20; const MESSAGE_LIMIT_TIME_FRAME_IN_SECS: i64 = 5;
56 const MESSAGE_LIMIT: usize = 500;
58
59 fn is_valid_message_version(&self, message_version: u32) -> bool;
61
62 fn is_within_sync_leniency(&self) -> bool {
64 const SYNC_LENIENCY: u32 = 10;
67
68 if let Some(num) = self.num_blocks_behind() {
69 num <= SYNC_LENIENCY
70 } else {
71 true
73 }
74 }
75
76 async fn inbound(&self, peer_addr: SocketAddr, message: Message<N>) -> Result<bool> {
80 let peer_ip = match self.router().resolve_to_listener(peer_addr) {
82 Some(peer_ip) => peer_ip,
83 None => {
84 trace!("Dropping a {} from {peer_addr} - no longer connected.", message.name());
86 return Ok(false);
87 }
88 };
89
90 let num_messages = self.router().cache.insert_inbound_message(peer_ip, Self::MESSAGE_LIMIT_TIME_FRAME_IN_SECS);
93 if num_messages > Self::MESSAGE_LIMIT {
94 bail!("Dropping '{peer_ip}' for spamming messages (num_messages = {num_messages})")
95 }
96
97 trace!("Received '{}' from '{peer_ip}'", message.name());
98
99 self.router().update_last_seen_for_connected_peer(peer_ip);
101
102 match message {
105 Message::BlockRequest(message) => {
106 let BlockRequest { start_height, end_height } = &message;
107 let frequency = self.router().cache.insert_inbound_block_request(peer_ip);
109 if frequency > Self::MAXIMUM_BLOCK_REQUESTS_PER_INTERVAL {
111 bail!("Peer '{peer_ip}' is not following the protocol (excessive block requests)")
112 }
113 if start_height >= end_height {
115 bail!("Block request from '{peer_ip}' has an invalid range ({start_height}..{end_height})")
116 }
117 if end_height - start_height > DataBlocks::<N>::MAXIMUM_NUMBER_OF_BLOCKS as u32 {
119 bail!("Block request from '{peer_ip}' has an excessive range ({start_height}..{end_height})")
120 }
121
122 let node = self.clone();
123 match spawn_blocking(move || node.block_request(peer_ip, message)).await? {
124 true => Ok(true),
125 false => bail!("Peer '{peer_ip}' sent an invalid block request"),
126 }
127 }
128 Message::BlockResponse(BlockResponse { request, latest_consensus_version, blocks, .. }) => {
129 if !self.router().cache.remove_outbound_block_request(peer_ip, &request) {
131 bail!("Peer '{peer_ip}' is not following the protocol (unexpected block response)")
132 }
133
134 let (send, recv) = tokio::sync::oneshot::channel();
138 rayon::spawn_fifo(move || {
139 let blocks = blocks.deserialize_blocking().map_err(|error| anyhow!("[BlockResponse] {error}"));
140 let _ = send.send(blocks);
141 });
142 let blocks = match recv.await {
143 Ok(Ok(blocks)) => blocks,
144 Ok(Err(error)) => bail!("Peer '{peer_ip}' sent an invalid block response - {error}"),
145 Err(error) => bail!("Peer '{peer_ip}' sent an invalid block response - {error}"),
146 };
147
148 blocks.ensure_response_is_well_formed(peer_ip, request.start_height, request.end_height)?;
150
151 let node = self.clone();
153 match spawn_blocking(move || node.block_response(peer_ip, blocks.0, latest_consensus_version)).await? {
154 true => Ok(true),
155 false => bail!("Peer '{peer_ip}' sent an invalid block response"),
156 }
157 }
158 Message::ChallengeRequest(..) | Message::ChallengeResponse(..) => {
159 bail!("Peer '{peer_ip}' is not following the protocol")
161 }
162 Message::Disconnect(message) => {
163 debug!("Peer '{peer_ip}' decided to disconnect due to '{:?}'", message.reason);
165 self.router().disconnect(peer_ip);
166 Ok(false)
167 }
168 Message::PeerRequest(..) => match self.peer_request(peer_ip) {
169 true => Ok(true),
170 false => bail!("Peer '{peer_ip}' sent an invalid peer request"),
171 },
172 Message::PeerResponse(message) => {
173 if !self.router().cache.contains_outbound_peer_request(peer_ip) {
174 bail!("Peer '{peer_ip}' is not following the protocol (unexpected peer response)")
175 }
176 self.router().cache.decrement_outbound_peer_requests(peer_ip);
177 if self.router().trusted_peers_only() {
178 bail!("Not accepting peer response from '{peer_ip}' (trusted peers only)");
179 }
180
181 match self.peer_response(peer_ip, message.peers) {
182 true => Ok(true),
183 false => bail!("Peer '{peer_ip}' sent an invalid peer response"),
184 }
185 }
186 Message::Ping(message) => {
187 if !self.is_valid_message_version(message.version) {
189 bail!("Dropping '{peer_ip}' on message version {} (outdated)", message.version);
190 }
191
192 let is_client_or_validator = message.node_type.is_client() || message.node_type.is_validator();
194 if is_client_or_validator && message.block_locators.is_none() {
195 bail!("Peer '{peer_ip}' is a {}, but no block locators were provided", message.node_type);
196 }
197 else if message.node_type.is_prover() && message.block_locators.is_some() {
199 bail!("Peer '{peer_ip}' is a prover, but block locators were provided");
200 }
201
202 match self.ping(peer_ip, message) {
204 true => Ok(true),
205 false => bail!("Peer '{peer_ip}' sent an invalid ping"),
206 }
207 }
208 Message::Pong(message) => match self.pong(peer_ip, message) {
209 true => Ok(true),
210 false => bail!("Peer '{peer_ip}' sent an invalid pong"),
211 },
212 Message::PuzzleRequest(..) => {
213 let frequency = self.router().cache.insert_inbound_puzzle_request(peer_ip);
215 if frequency > Self::MAXIMUM_PUZZLE_REQUESTS_PER_INTERVAL {
217 bail!("Peer '{peer_ip}' is not following the protocol (excessive puzzle requests)")
218 }
219 match self.puzzle_request(peer_ip) {
221 true => Ok(true),
222 false => bail!("Peer '{peer_ip}' sent an invalid puzzle request"),
223 }
224 }
225 Message::PuzzleResponse(message) => {
226 if !self.router().cache.contains_outbound_puzzle_request(&peer_ip) {
228 bail!("Peer '{peer_ip}' is not following the protocol (unexpected puzzle response)")
229 }
230 self.router().cache.decrement_outbound_puzzle_requests(peer_ip);
232
233 let header = match message.block_header.deserialize().await {
235 Ok(header) => header,
236 Err(error) => bail!("[PuzzleResponse] {error}"),
237 };
238 match self.puzzle_response(peer_ip, message.epoch_hash, header) {
240 true => Ok(true),
241 false => bail!("Peer '{peer_ip}' sent an invalid puzzle response"),
242 }
243 }
244 Message::UnconfirmedSolution(message) => {
245 if !self.is_within_sync_leniency() {
247 trace!("Skipped processing unconfirmed solution '{}' (node is syncing)", message.solution_id);
248 return Ok(true);
249 }
250
251 let seen_before = self.router().cache.insert_inbound_solution(peer_ip, message.solution_id).is_some();
253 if seen_before {
255 trace!("Skipping 'UnconfirmedSolution' from '{peer_ip}'");
256 return Ok(true);
257 }
258 let serialized = message.clone();
260 let solution = match message.solution.deserialize().await {
262 Ok(solution) => solution,
263 Err(error) => bail!("[UnconfirmedSolution] {error}"),
264 };
265 if message.solution_id != solution.id() {
267 bail!("Peer '{peer_ip}' is not following the 'UnconfirmedSolution' protocol")
268 }
269 match self.unconfirmed_solution(peer_ip, serialized, solution).await {
271 true => Ok(true),
272 false => bail!("Peer '{peer_ip}' sent an invalid unconfirmed solution"),
273 }
274 }
275 Message::UnconfirmedTransaction(message) => {
276 if !self.is_within_sync_leniency() {
278 trace!("Skipped processing unconfirmed transaction '{}' (node is syncing)", message.transaction_id);
279 return Ok(true);
280 }
281 let seen_before =
283 self.router().cache.insert_inbound_transaction(peer_ip, message.transaction_id).is_some();
284 if seen_before {
286 trace!("Skipping 'UnconfirmedTransaction' from '{peer_ip}'");
287 return Ok(true);
288 }
289 let serialized = message.clone();
291 let transaction = match message.transaction.deserialize().await {
293 Ok(transaction) => transaction,
294 Err(error) => bail!("[UnconfirmedTransaction] {error}"),
295 };
296 if message.transaction_id != transaction.id() {
298 bail!("Peer '{peer_ip}' is not following the 'UnconfirmedTransaction' protocol")
299 }
300 match self.unconfirmed_transaction(peer_ip, serialized, transaction).await {
302 true => Ok(true),
303 false => bail!("Peer '{peer_ip}' sent an invalid unconfirmed transaction"),
304 }
305 }
306 }
307 }
308
309 fn block_request(&self, peer_ip: SocketAddr, _message: BlockRequest) -> bool;
311
312 fn block_response(
314 &self,
315 peer_ip: SocketAddr,
316 blocks: Vec<Block<N>>,
317 latest_consensus_version: Option<ConsensusVersion>,
318 ) -> bool;
319
320 fn peer_request(&self, peer_ip: SocketAddr) -> bool {
322 let peers = self.router().get_best_connected_peers(Some(MAX_PEERS_TO_SEND));
323 let peers = peers.into_iter().map(|peer| (peer.listener_addr, peer.last_height_seen)).collect();
324
325 self.router().send(peer_ip, Message::PeerResponse(PeerResponse { peers }));
327 true
328 }
329
330 fn peer_response(&self, _peer_ip: SocketAddr, peers: Vec<(SocketAddr, Option<u32>)>) -> bool {
332 if peers.len() > MAX_PEERS_TO_SEND {
334 return false;
335 }
336 if !peers.is_empty() {
338 self.router().insert_candidate_peers(peers);
339 }
340
341 #[cfg(feature = "metrics")]
342 self.router().update_metrics();
343
344 true
345 }
346
347 fn ping(&self, peer_ip: SocketAddr, message: Ping<N>) -> bool;
349
350 fn pong(&self, peer_ip: SocketAddr, _message: Pong) -> bool;
352
353 fn puzzle_request(&self, peer_ip: SocketAddr) -> bool;
355
356 fn puzzle_response(&self, peer_ip: SocketAddr, _epoch_hash: N::BlockHash, _header: Header<N>) -> bool;
358
359 async fn unconfirmed_solution(
361 &self,
362 peer_ip: SocketAddr,
363 serialized: UnconfirmedSolution<N>,
364 solution: Solution<N>,
365 ) -> bool;
366
367 async fn unconfirmed_transaction(
369 &self,
370 peer_ip: SocketAddr,
371 serialized: UnconfirmedTransaction<N>,
372 _transaction: Transaction<N>,
373 ) -> bool;
374}