1use crate::{
17 Outbound,
18 PeerPoolHandling,
19 messages::{
20 BlockRequest,
21 BlockResponse,
22 DataBlocks,
23 Message,
24 PeerResponse,
25 Ping,
26 Pong,
27 UnconfirmedSolution,
28 UnconfirmedTransaction,
29 },
30};
31use snarkos_node_tcp::protocols::Reading;
32use snarkvm::prelude::{
33 ConsensusVersion,
34 Network,
35 block::{Block, Header, Transaction},
36 puzzle::Solution,
37};
38
39use anyhow::{Result, anyhow, bail};
40use std::net::SocketAddr;
41use tokio::task::spawn_blocking;
42
43pub const MAX_PEERS_TO_SEND: usize = u8::MAX as usize;
45
46#[async_trait]
47pub trait Inbound<N: Network>: Reading + Outbound<N> {
48 const MAXIMUM_PUZZLE_REQUESTS_PER_INTERVAL: usize = 5;
50 const MAXIMUM_BLOCK_REQUESTS_PER_INTERVAL: usize = 256;
52 const MAXIMUM_UNCONFIRMED_SOLUTIONS_PER_INTERVAL: usize = 64;
54 const PING_SLEEP_IN_SECS: u64 = 20; const MESSAGE_LIMIT_TIME_FRAME_IN_SECS: i64 = 5;
58 const MESSAGE_LIMIT: usize = 500;
60
61 fn is_valid_message_version(&self, message_version: u32) -> bool;
63
64 fn is_within_sync_leniency(&self) -> bool {
66 const SYNC_LENIENCY: u32 = 10;
69
70 if let Some(num) = self.num_blocks_behind() {
71 num <= SYNC_LENIENCY
72 } else {
73 true
75 }
76 }
77
78 async fn inbound(&self, peer_addr: SocketAddr, message: Message<N>) -> Result<bool> {
82 let peer_ip = match self.router().resolve_to_listener(peer_addr) {
84 Some(peer_ip) => peer_ip,
85 None => {
86 trace!("Dropping a {} from {peer_addr} - no longer connected.", message.name());
88 return Ok(false);
89 }
90 };
91
92 let num_messages = self.router().cache.insert_inbound_message(peer_ip, Self::MESSAGE_LIMIT_TIME_FRAME_IN_SECS);
95 if num_messages > Self::MESSAGE_LIMIT {
96 bail!("Dropping '{peer_ip}' for spamming messages (num_messages = {num_messages})")
97 }
98
99 trace!("Received '{}' from '{peer_ip}'", message.name());
100
101 self.router().update_last_seen_for_connected_peer(peer_ip);
103
104 match message {
107 Message::BlockRequest(message) => {
108 let BlockRequest { start_height, end_height } = &message;
109 let frequency = self.router().cache.insert_inbound_block_request(peer_ip);
111 if frequency > Self::MAXIMUM_BLOCK_REQUESTS_PER_INTERVAL {
113 bail!("Peer '{peer_ip}' is not following the protocol (excessive block requests)")
114 }
115 if start_height >= end_height {
117 bail!("Block request from '{peer_ip}' has an invalid range ({start_height}..{end_height})")
118 }
119 if end_height - start_height > DataBlocks::<N>::MAXIMUM_NUMBER_OF_BLOCKS as u32 {
121 bail!("Block request from '{peer_ip}' has an excessive range ({start_height}..{end_height})")
122 }
123
124 let node = self.clone();
125 match spawn_blocking(move || node.block_request(peer_ip, message)).await? {
126 true => Ok(true),
127 false => bail!("Peer '{peer_ip}' sent an invalid block request"),
128 }
129 }
130 Message::BlockResponse(BlockResponse { request, latest_consensus_version, blocks, .. }) => {
131 if !self.router().cache.remove_outbound_block_request(peer_ip, &request) {
133 bail!("Peer '{peer_ip}' is not following the protocol (unexpected block response)")
134 }
135
136 let (send, recv) = tokio::sync::oneshot::channel();
140 rayon::spawn_fifo(move || {
141 let blocks = blocks.deserialize_blocking().map_err(|error| anyhow!("[BlockResponse] {error}"));
142 let _ = send.send(blocks);
143 });
144 let blocks = match recv.await {
145 Ok(Ok(blocks)) => blocks,
146 Ok(Err(error)) => bail!("Peer '{peer_ip}' sent an invalid block response - {error}"),
147 Err(error) => bail!("Peer '{peer_ip}' sent an invalid block response - {error}"),
148 };
149
150 blocks.ensure_response_is_well_formed(peer_ip, request.start_height, request.end_height)?;
152
153 let node = self.clone();
155 match spawn_blocking(move || node.block_response(peer_ip, blocks.0, latest_consensus_version)).await? {
156 true => Ok(true),
157 false => bail!("Peer '{peer_ip}' sent an invalid block response"),
158 }
159 }
160 Message::ChallengeRequest(..) | Message::ChallengeResponse(..) => {
161 bail!("Peer '{peer_ip}' is not following the protocol")
163 }
164 Message::Disconnect(message) => {
165 debug!("Peer '{peer_ip}' decided to disconnect due to '{}'", message.reason);
167 self.router().disconnect(peer_ip);
168 Ok(false)
169 }
170 Message::PeerRequest(..) => match self.peer_request(peer_ip) {
171 true => Ok(true),
172 false => bail!("Peer '{peer_ip}' sent an invalid peer request"),
173 },
174 Message::PeerResponse(message) => {
175 if !self.router().cache.contains_outbound_peer_request(peer_ip) {
176 bail!("Peer '{peer_ip}' is not following the protocol (unexpected peer response)")
177 }
178 self.router().cache.decrement_outbound_peer_requests(peer_ip);
179 if self.router().trusted_peers_only() {
180 bail!("Not accepting peer response from '{peer_ip}' (trusted peers only)");
181 }
182
183 match self.peer_response(peer_ip, message.peers) {
184 true => Ok(true),
185 false => bail!("Peer '{peer_ip}' sent an invalid peer response"),
186 }
187 }
188 Message::Ping(message) => {
189 if !self.is_valid_message_version(message.version) {
191 bail!("Dropping '{peer_ip}' on message version {} (outdated)", message.version);
192 }
193
194 let is_client_or_validator = message.node_type.is_client() || message.node_type.is_validator();
196 if is_client_or_validator && message.block_locators.is_none() {
197 bail!("Peer '{peer_ip}' is a {}, but no block locators were provided", message.node_type);
198 }
199 else if message.node_type.is_prover() && message.block_locators.is_some() {
201 bail!("Peer '{peer_ip}' is a prover, but block locators were provided");
202 }
203
204 match self.ping(peer_ip, message) {
206 true => Ok(true),
207 false => bail!("Peer '{peer_ip}' sent an invalid ping"),
208 }
209 }
210 Message::Pong(message) => match self.pong(peer_ip, message) {
211 true => Ok(true),
212 false => bail!("Peer '{peer_ip}' sent an invalid pong"),
213 },
214 Message::PuzzleRequest(..) => {
215 let frequency = self.router().cache.insert_inbound_puzzle_request(peer_ip);
217 if frequency > Self::MAXIMUM_PUZZLE_REQUESTS_PER_INTERVAL {
219 bail!("Peer '{peer_ip}' is not following the protocol (excessive puzzle requests)")
220 }
221 match self.puzzle_request(peer_ip) {
223 true => Ok(true),
224 false => bail!("Peer '{peer_ip}' sent an invalid puzzle request"),
225 }
226 }
227 Message::PuzzleResponse(message) => {
228 if !self.router().cache.contains_outbound_puzzle_request(&peer_ip) {
230 bail!("Peer '{peer_ip}' is not following the protocol (unexpected puzzle response)")
231 }
232 self.router().cache.decrement_outbound_puzzle_requests(peer_ip);
234
235 let header = match message.block_header.deserialize().await {
237 Ok(header) => header,
238 Err(error) => bail!("[PuzzleResponse] {error}"),
239 };
240 match self.puzzle_response(peer_ip, message.epoch_hash, header) {
242 true => Ok(true),
243 false => bail!("Peer '{peer_ip}' sent an invalid puzzle response"),
244 }
245 }
246 Message::UnconfirmedSolution(message) => {
247 let frequency = self.router().cache.insert_inbound_unconfirmed_solution(peer_ip);
249 if frequency > Self::MAXIMUM_UNCONFIRMED_SOLUTIONS_PER_INTERVAL {
251 bail!("Peer '{peer_ip}' is not following the protocol (excessive unconfirmed solutions)")
252 }
253
254 if !self.is_within_sync_leniency() {
256 trace!("Skipped processing unconfirmed solution '{}' (node is syncing)", message.solution_id);
257 return Ok(true);
258 }
259
260 let seen_before = self.router().cache.insert_inbound_solution(peer_ip, message.solution_id).is_some();
262 if seen_before {
264 trace!("Skipping 'UnconfirmedSolution' from '{peer_ip}'");
265 return Ok(true);
266 }
267 let serialized = message.clone();
269 let solution = match message.solution.deserialize().await {
271 Ok(solution) => solution,
272 Err(error) => bail!("[UnconfirmedSolution] {error}"),
273 };
274 if message.solution_id != solution.id() {
276 bail!("Peer '{peer_ip}' is not following the 'UnconfirmedSolution' protocol")
277 }
278 match self.unconfirmed_solution(peer_ip, serialized, solution).await {
280 true => Ok(true),
281 false => bail!("Peer '{peer_ip}' sent an invalid unconfirmed solution"),
282 }
283 }
284 Message::UnconfirmedTransaction(message) => {
285 if !self.is_within_sync_leniency() {
287 trace!("Skipped processing unconfirmed transaction '{}' (node is syncing)", message.transaction_id);
288 return Ok(true);
289 }
290 let seen_before =
292 self.router().cache.insert_inbound_transaction(peer_ip, message.transaction_id).is_some();
293 if seen_before {
295 trace!("Skipping 'UnconfirmedTransaction' from '{peer_ip}'");
296 return Ok(true);
297 }
298 let serialized = message.clone();
300 let transaction = match message.transaction.deserialize().await {
302 Ok(transaction) => transaction,
303 Err(error) => bail!("[UnconfirmedTransaction] {error}"),
304 };
305 if message.transaction_id != transaction.id() {
307 bail!("Peer '{peer_ip}' is not following the 'UnconfirmedTransaction' protocol")
308 }
309 match self.unconfirmed_transaction(peer_ip, serialized, transaction).await {
311 true => Ok(true),
312 false => bail!("Peer '{peer_ip}' sent an invalid unconfirmed transaction"),
313 }
314 }
315 }
316 }
317
318 fn block_request(&self, peer_ip: SocketAddr, _message: BlockRequest) -> bool;
320
321 fn block_response(
323 &self,
324 peer_ip: SocketAddr,
325 blocks: Vec<Block<N>>,
326 latest_consensus_version: Option<ConsensusVersion>,
327 ) -> bool;
328
329 fn peer_request(&self, peer_ip: SocketAddr) -> bool {
331 let peers = self.router().get_best_connected_peers(Some(MAX_PEERS_TO_SEND));
332 let peers = peers.into_iter().map(|peer| (peer.listener_addr, peer.last_height_seen)).collect();
333
334 self.router().send(peer_ip, Message::PeerResponse(PeerResponse { peers }));
336 true
337 }
338
339 fn peer_response(&self, _peer_ip: SocketAddr, peers: Vec<(SocketAddr, Option<u32>)>) -> bool {
341 if peers.len() > MAX_PEERS_TO_SEND {
343 return false;
344 }
345 if !peers.is_empty() {
347 self.router().insert_candidate_peers(peers);
348 }
349
350 #[cfg(feature = "metrics")]
351 self.router().update_metrics();
352
353 true
354 }
355
356 fn ping(&self, peer_ip: SocketAddr, message: Ping<N>) -> bool;
358
359 fn pong(&self, peer_ip: SocketAddr, _message: Pong) -> bool;
361
362 fn puzzle_request(&self, peer_ip: SocketAddr) -> bool;
364
365 fn puzzle_response(&self, peer_ip: SocketAddr, _epoch_hash: N::BlockHash, _header: Header<N>) -> bool;
367
368 async fn unconfirmed_solution(
370 &self,
371 peer_ip: SocketAddr,
372 serialized: UnconfirmedSolution<N>,
373 solution: Solution<N>,
374 ) -> bool;
375
376 async fn unconfirmed_transaction(
378 &self,
379 peer_ip: SocketAddr,
380 serialized: UnconfirmedTransaction<N>,
381 _transaction: Transaction<N>,
382 ) -> bool;
383}