1use crate::{
17 Outbound,
18 messages::{
19 BlockRequest,
20 BlockResponse,
21 DataBlocks,
22 Message,
23 PeerResponse,
24 Ping,
25 Pong,
26 UnconfirmedSolution,
27 UnconfirmedTransaction,
28 },
29};
30use snarkos_node_tcp::protocols::Reading;
31use snarkvm::prelude::{
32 Network,
33 block::{Block, Header, Transaction},
34 puzzle::Solution,
35};
36
37use anyhow::{Result, anyhow, bail};
38use snarkos_node_tcp::is_bogon_ip;
39use std::net::SocketAddr;
40use tokio::task::spawn_blocking;
41
42const MAX_PEERS_TO_SEND: usize = u8::MAX as usize;
44
45#[async_trait]
46pub trait Inbound<N: Network>: Reading + Outbound<N> {
47 const MAXIMUM_PUZZLE_REQUESTS_PER_INTERVAL: usize = 5;
49 const MAXIMUM_BLOCK_REQUESTS_PER_INTERVAL: usize = 256;
51 const PING_SLEEP_IN_SECS: u64 = 20; const MESSAGE_LIMIT_TIME_FRAME_IN_SECS: i64 = 5;
55 const MESSAGE_LIMIT: usize = 500;
57
58 fn is_valid_message_version(&self, message_version: u32) -> bool;
60
61 fn is_within_sync_leniency(&self) -> bool {
63 const SYNC_LENIENCY: u32 = 10;
66
67 if let Some(num) = self.num_blocks_behind() {
68 num <= SYNC_LENIENCY
69 } else {
70 true
72 }
73 }
74
75 async fn inbound(&self, peer_addr: SocketAddr, message: Message<N>) -> Result<bool> {
79 let peer_ip = match self.router().resolve_to_listener(&peer_addr) {
81 Some(peer_ip) => peer_ip,
82 None => {
83 trace!("Dropping a {} from {peer_addr} - no longer connected.", message.name());
85 return Ok(false);
86 }
87 };
88
89 let num_messages = self.router().cache.insert_inbound_message(peer_ip, Self::MESSAGE_LIMIT_TIME_FRAME_IN_SECS);
92 if num_messages > Self::MESSAGE_LIMIT {
93 bail!("Dropping '{peer_ip}' for spamming messages (num_messages = {num_messages})")
94 }
95
96 trace!("Received '{}' from '{peer_ip}'", message.name());
97
98 self.router().update_last_seen_for_connected_peer(peer_ip);
100
101 match message {
104 Message::BlockRequest(message) => {
105 let BlockRequest { start_height, end_height } = &message;
106 let frequency = self.router().cache.insert_inbound_block_request(peer_ip);
108 if frequency > Self::MAXIMUM_BLOCK_REQUESTS_PER_INTERVAL {
110 bail!("Peer '{peer_ip}' is not following the protocol (excessive block requests)")
111 }
112 if start_height >= end_height {
114 bail!("Block request from '{peer_ip}' has an invalid range ({start_height}..{end_height})")
115 }
116 if end_height - start_height > DataBlocks::<N>::MAXIMUM_NUMBER_OF_BLOCKS as u32 {
118 bail!("Block request from '{peer_ip}' has an excessive range ({start_height}..{end_height})")
119 }
120
121 let node = self.clone();
122 match spawn_blocking(move || node.block_request(peer_ip, message)).await? {
123 true => Ok(true),
124 false => bail!("Peer '{peer_ip}' sent an invalid block request"),
125 }
126 }
127 Message::BlockResponse(message) => {
128 let BlockResponse { request, blocks } = message;
129
130 if !self.router().cache.remove_outbound_block_request(peer_ip, &request) {
132 bail!("Peer '{peer_ip}' is not following the protocol (unexpected block response)")
133 }
134 let (send, recv) = tokio::sync::oneshot::channel();
138 rayon::spawn_fifo(move || {
139 let blocks = blocks.deserialize_blocking().map_err(|error| anyhow!("[BlockResponse] {error}"));
140 let _ = send.send(blocks);
141 });
142 let blocks = match recv.await {
143 Ok(Ok(blocks)) => blocks,
144 Ok(Err(error)) => bail!("Peer '{peer_ip}' sent an invalid block response - {error}"),
145 Err(error) => bail!("Peer '{peer_ip}' sent an invalid block response - {error}"),
146 };
147
148 blocks.ensure_response_is_well_formed(peer_ip, request.start_height, request.end_height)?;
150
151 let node = self.clone();
153 match spawn_blocking(move || node.block_response(peer_ip, blocks.0)).await? {
154 true => Ok(true),
155 false => bail!("Peer '{peer_ip}' sent an invalid block response"),
156 }
157 }
158 Message::ChallengeRequest(..) | Message::ChallengeResponse(..) => {
159 bail!("Peer '{peer_ip}' is not following the protocol")
161 }
162 Message::Disconnect(message) => {
163 debug!("Peer '{peer_ip}' decided to disconnect due to '{:?}'", message.reason);
165 self.router().disconnect(peer_ip);
166 Ok(false)
167 }
168 Message::PeerRequest(..) => match self.peer_request(peer_ip) {
169 true => Ok(true),
170 false => bail!("Peer '{peer_ip}' sent an invalid peer request"),
171 },
172 Message::PeerResponse(message) => {
173 if !self.router().cache.contains_outbound_peer_request(peer_ip) {
174 bail!("Peer '{peer_ip}' is not following the protocol (unexpected peer response)")
175 }
176 self.router().cache.decrement_outbound_peer_requests(peer_ip);
177 if !self.router().allow_external_peers() {
178 bail!("Not accepting peer response from '{peer_ip}' (validator gossip is disabled)");
179 }
180
181 match self.peer_response(peer_ip, &message.peers) {
182 true => Ok(true),
183 false => bail!("Peer '{peer_ip}' sent an invalid peer response"),
184 }
185 }
186 Message::Ping(message) => {
187 if !self.is_valid_message_version(message.version) {
189 bail!("Dropping '{peer_ip}' on message version {} (outdated)", message.version);
190 }
191
192 let is_client_or_validator = message.node_type.is_client() || message.node_type.is_validator();
194 if is_client_or_validator && message.block_locators.is_none() {
195 bail!("Peer '{peer_ip}' is a {}, but no block locators were provided", message.node_type);
196 }
197 else if message.node_type.is_prover() && message.block_locators.is_some() {
199 bail!("Peer '{peer_ip}' is a prover or client, but block locators were provided");
200 }
201
202 match self.ping(peer_ip, message) {
204 true => Ok(true),
205 false => bail!("Peer '{peer_ip}' sent an invalid ping"),
206 }
207 }
208 Message::Pong(message) => match self.pong(peer_ip, message) {
209 true => Ok(true),
210 false => bail!("Peer '{peer_ip}' sent an invalid pong"),
211 },
212 Message::PuzzleRequest(..) => {
213 let frequency = self.router().cache.insert_inbound_puzzle_request(peer_ip);
215 if frequency > Self::MAXIMUM_PUZZLE_REQUESTS_PER_INTERVAL {
217 bail!("Peer '{peer_ip}' is not following the protocol (excessive puzzle requests)")
218 }
219 match self.puzzle_request(peer_ip) {
221 true => Ok(true),
222 false => bail!("Peer '{peer_ip}' sent an invalid puzzle request"),
223 }
224 }
225 Message::PuzzleResponse(message) => {
226 if !self.router().cache.contains_outbound_puzzle_request(&peer_ip) {
228 bail!("Peer '{peer_ip}' is not following the protocol (unexpected puzzle response)")
229 }
230 self.router().cache.decrement_outbound_puzzle_requests(peer_ip);
232
233 let header = match message.block_header.deserialize().await {
235 Ok(header) => header,
236 Err(error) => bail!("[PuzzleResponse] {error}"),
237 };
238 match self.puzzle_response(peer_ip, message.epoch_hash, header) {
240 true => Ok(true),
241 false => bail!("Peer '{peer_ip}' sent an invalid puzzle response"),
242 }
243 }
244 Message::UnconfirmedSolution(message) => {
245 if !self.is_within_sync_leniency() {
247 trace!("Skipped processing unconfirmed solution '{}' (node is syncing)", message.solution_id);
248 return Ok(true);
249 }
250
251 let seen_before = self.router().cache.insert_inbound_solution(peer_ip, message.solution_id).is_some();
253 if seen_before {
255 trace!("Skipping 'UnconfirmedSolution' from '{peer_ip}'");
256 return Ok(true);
257 }
258 let serialized = message.clone();
260 let solution = match message.solution.deserialize().await {
262 Ok(solution) => solution,
263 Err(error) => bail!("[UnconfirmedSolution] {error}"),
264 };
265 if message.solution_id != solution.id() {
267 bail!("Peer '{peer_ip}' is not following the 'UnconfirmedSolution' protocol")
268 }
269 match self.unconfirmed_solution(peer_ip, serialized, solution).await {
271 true => Ok(true),
272 false => bail!("Peer '{peer_ip}' sent an invalid unconfirmed solution"),
273 }
274 }
275 Message::UnconfirmedTransaction(message) => {
276 if !self.is_within_sync_leniency() {
278 trace!("Skipped processing unconfirmed transaction '{}' (node is syncing)", message.transaction_id);
279 return Ok(true);
280 }
281 let seen_before =
283 self.router().cache.insert_inbound_transaction(peer_ip, message.transaction_id).is_some();
284 if seen_before {
286 trace!("Skipping 'UnconfirmedTransaction' from '{peer_ip}'");
287 return Ok(true);
288 }
289 let serialized = message.clone();
291 let transaction = match message.transaction.deserialize().await {
293 Ok(transaction) => transaction,
294 Err(error) => bail!("[UnconfirmedTransaction] {error}"),
295 };
296 if message.transaction_id != transaction.id() {
298 bail!("Peer '{peer_ip}' is not following the 'UnconfirmedTransaction' protocol")
299 }
300 match self.unconfirmed_transaction(peer_ip, serialized, transaction).await {
302 true => Ok(true),
303 false => bail!("Peer '{peer_ip}' sent an invalid unconfirmed transaction"),
304 }
305 }
306 }
307 }
308
309 fn block_request(&self, peer_ip: SocketAddr, _message: BlockRequest) -> bool;
311
312 fn block_response(&self, peer_ip: SocketAddr, _blocks: Vec<Block<N>>) -> bool;
314
315 fn peer_request(&self, peer_ip: SocketAddr) -> bool {
317 let peers = self.router().connected_peers();
319 let peers = match self.router().is_dev() {
321 true => {
323 peers.into_iter().filter(|ip| *ip != peer_ip && !is_bogon_ip(ip.ip())).take(MAX_PEERS_TO_SEND).collect()
324 }
325 false => peers
327 .into_iter()
328 .filter(|ip| *ip != peer_ip && self.router().is_valid_peer_ip(ip))
329 .take(MAX_PEERS_TO_SEND)
330 .collect(),
331 };
332 self.router().send(peer_ip, Message::PeerResponse(PeerResponse { peers }));
334 true
335 }
336
337 fn peer_response(&self, _peer_ip: SocketAddr, peers: &[SocketAddr]) -> bool {
339 if peers.len() > MAX_PEERS_TO_SEND {
341 return false;
342 }
343 let peers = match self.router().is_dev() {
345 true => peers.iter().copied().filter(|ip| !is_bogon_ip(ip.ip())).collect::<Vec<_>>(),
347 false => peers.iter().copied().filter(|ip| self.router().is_valid_peer_ip(ip)).collect(),
349 };
350 self.router().insert_candidate_peers(&peers);
352 true
353 }
354
355 fn ping(&self, peer_ip: SocketAddr, message: Ping<N>) -> bool;
357
358 fn pong(&self, peer_ip: SocketAddr, _message: Pong) -> bool;
360
361 fn puzzle_request(&self, peer_ip: SocketAddr) -> bool;
363
364 fn puzzle_response(&self, peer_ip: SocketAddr, _epoch_hash: N::BlockHash, _header: Header<N>) -> bool;
366
367 async fn unconfirmed_solution(
369 &self,
370 peer_ip: SocketAddr,
371 serialized: UnconfirmedSolution<N>,
372 solution: Solution<N>,
373 ) -> bool;
374
375 async fn unconfirmed_transaction(
377 &self,
378 peer_ip: SocketAddr,
379 serialized: UnconfirmedTransaction<N>,
380 _transaction: Transaction<N>,
381 ) -> bool;
382}