1use crate::{
17 Outbound,
18 Peer,
19 messages::{
20 BlockRequest,
21 BlockResponse,
22 DataBlocks,
23 Message,
24 PeerResponse,
25 Ping,
26 Pong,
27 UnconfirmedSolution,
28 UnconfirmedTransaction,
29 },
30};
31use snarkos_node_tcp::protocols::Reading;
32use snarkvm::prelude::{
33 Network,
34 block::{Block, Header, Transaction},
35 puzzle::Solution,
36};
37
38use anyhow::{Result, anyhow, bail};
39use snarkos_node_tcp::is_bogon_ip;
40use std::net::SocketAddr;
41use tokio::task::spawn_blocking;
42
43const MAX_PEERS_TO_SEND: usize = u8::MAX as usize;
45
46pub const SYNC_LENIENCY: u32 = 10;
49
50#[async_trait]
51pub trait Inbound<N: Network>: Reading + Outbound<N> {
52 const MAXIMUM_PUZZLE_REQUESTS_PER_INTERVAL: usize = 5;
54 const MAXIMUM_BLOCK_REQUESTS_PER_INTERVAL: usize = 256;
56 const PING_SLEEP_IN_SECS: u64 = 20; const MESSAGE_LIMIT_TIME_FRAME_IN_SECS: i64 = 5;
60 const MESSAGE_LIMIT: usize = 500;
62
63 fn is_valid_message_version(&self, message_version: u32) -> bool;
65
66 async fn inbound(&self, peer_addr: SocketAddr, message: Message<N>) -> Result<()> {
68 let peer_ip = match self.router().resolve_to_listener(&peer_addr) {
70 Some(peer_ip) => peer_ip,
71 None => bail!("Unable to resolve the (ambiguous) peer address '{peer_addr}'"),
72 };
73
74 let num_messages = self.router().cache.insert_inbound_message(peer_ip, Self::MESSAGE_LIMIT_TIME_FRAME_IN_SECS);
77 if num_messages > Self::MESSAGE_LIMIT {
78 bail!("Dropping '{peer_ip}' for spamming messages (num_messages = {num_messages})")
79 }
80
81 trace!("Received '{}' from '{peer_ip}'", message.name());
82
83 self.router().update_last_seen_for_connected_peer(peer_ip);
85
86 match message {
89 Message::BlockRequest(message) => {
90 let BlockRequest { start_height, end_height } = &message;
91 let frequency = self.router().cache.insert_inbound_block_request(peer_ip);
93 if frequency > Self::MAXIMUM_BLOCK_REQUESTS_PER_INTERVAL {
95 bail!("Peer '{peer_ip}' is not following the protocol (excessive block requests)")
96 }
97 if start_height >= end_height {
99 bail!("Block request from '{peer_ip}' has an invalid range ({start_height}..{end_height})")
100 }
101 if end_height - start_height > DataBlocks::<N>::MAXIMUM_NUMBER_OF_BLOCKS as u32 {
103 bail!("Block request from '{peer_ip}' has an excessive range ({start_height}..{end_height})")
104 }
105
106 let node = self.clone();
107 match spawn_blocking(move || node.block_request(peer_ip, message)).await? {
108 true => Ok(()),
109 false => bail!("Peer '{peer_ip}' sent an invalid block request"),
110 }
111 }
112 Message::BlockResponse(message) => {
113 let BlockResponse { request, blocks } = message;
114
115 if !self.router().cache.remove_outbound_block_request(peer_ip, &request) {
117 bail!("Peer '{peer_ip}' is not following the protocol (unexpected block response)")
118 }
119 let (send, recv) = tokio::sync::oneshot::channel();
123 rayon::spawn_fifo(move || {
124 let blocks = blocks.deserialize_blocking().map_err(|error| anyhow!("[BlockResponse] {error}"));
125 let _ = send.send(blocks);
126 });
127 let blocks = match recv.await {
128 Ok(Ok(blocks)) => blocks,
129 Ok(Err(error)) => bail!("Peer '{peer_ip}' sent an invalid block response - {error}"),
130 Err(error) => bail!("Peer '{peer_ip}' sent an invalid block response - {error}"),
131 };
132
133 blocks.ensure_response_is_well_formed(peer_ip, request.start_height, request.end_height)?;
135
136 let node = self.clone();
138 match spawn_blocking(move || node.block_response(peer_ip, blocks.0)).await? {
139 true => Ok(()),
140 false => bail!("Peer '{peer_ip}' sent an invalid block response"),
141 }
142 }
143 Message::ChallengeRequest(..) | Message::ChallengeResponse(..) => {
144 bail!("Peer '{peer_ip}' is not following the protocol")
146 }
147 Message::Disconnect(message) => {
148 bail!("{:?}", message.reason)
149 }
150 Message::PeerRequest(..) => match self.peer_request(peer_ip) {
151 true => Ok(()),
152 false => bail!("Peer '{peer_ip}' sent an invalid peer request"),
153 },
154 Message::PeerResponse(message) => {
155 if !self.router().cache.contains_outbound_peer_request(peer_ip) {
156 bail!("Peer '{peer_ip}' is not following the protocol (unexpected peer response)")
157 }
158 self.router().cache.decrement_outbound_peer_requests(peer_ip);
159 if !self.router().allow_external_peers() {
160 bail!("Not accepting peer response from '{peer_ip}' (validator gossip is disabled)");
161 }
162
163 match self.peer_response(peer_ip, &message.peers) {
164 true => Ok(()),
165 false => bail!("Peer '{peer_ip}' sent an invalid peer response"),
166 }
167 }
168 Message::Ping(message) => {
169 if !self.is_valid_message_version(message.version) {
171 bail!("Dropping '{peer_ip}' on message version {} (outdated)", message.version);
172 }
173
174 let is_client_or_validator = message.node_type.is_client() || message.node_type.is_validator();
176 if is_client_or_validator && message.block_locators.is_none() {
177 bail!("Peer '{peer_ip}' is a {}, but no block locators were provided", message.node_type);
178 }
179 else if message.node_type.is_prover() && message.block_locators.is_some() {
181 bail!("Peer '{peer_ip}' is a prover or client, but block locators were provided");
182 }
183
184 if let Err(error) =
186 self.router().update_connected_peer(peer_ip, message.node_type, |peer: &mut Peer<N>| {
187 peer.set_version(message.version);
189 peer.set_node_type(message.node_type);
191 })
192 {
193 bail!("[Ping] {error}");
194 }
195
196 match self.ping(peer_ip, message) {
198 true => Ok(()),
199 false => bail!("Peer '{peer_ip}' sent an invalid ping"),
200 }
201 }
202 Message::Pong(message) => match self.pong(peer_ip, message) {
203 true => Ok(()),
204 false => bail!("Peer '{peer_ip}' sent an invalid pong"),
205 },
206 Message::PuzzleRequest(..) => {
207 let frequency = self.router().cache.insert_inbound_puzzle_request(peer_ip);
209 if frequency > Self::MAXIMUM_PUZZLE_REQUESTS_PER_INTERVAL {
211 bail!("Peer '{peer_ip}' is not following the protocol (excessive puzzle requests)")
212 }
213 match self.puzzle_request(peer_ip) {
215 true => Ok(()),
216 false => bail!("Peer '{peer_ip}' sent an invalid puzzle request"),
217 }
218 }
219 Message::PuzzleResponse(message) => {
220 if !self.router().cache.contains_outbound_puzzle_request(&peer_ip) {
222 bail!("Peer '{peer_ip}' is not following the protocol (unexpected puzzle response)")
223 }
224 self.router().cache.decrement_outbound_puzzle_requests(peer_ip);
226
227 let header = match message.block_header.deserialize().await {
229 Ok(header) => header,
230 Err(error) => bail!("[PuzzleResponse] {error}"),
231 };
232 match self.puzzle_response(peer_ip, message.epoch_hash, header) {
234 true => Ok(()),
235 false => bail!("Peer '{peer_ip}' sent an invalid puzzle response"),
236 }
237 }
238 Message::UnconfirmedSolution(message) => {
239 if self.num_blocks_behind() > SYNC_LENIENCY {
241 trace!("Skipped processing unconfirmed solution '{}' (node is syncing)", message.solution_id);
242 return Ok(());
243 }
244 let seen_before = self.router().cache.insert_inbound_solution(peer_ip, message.solution_id).is_some();
246 if seen_before {
248 trace!("Skipping 'UnconfirmedSolution' from '{peer_ip}'");
249 return Ok(());
250 }
251 let serialized = message.clone();
253 let solution = match message.solution.deserialize().await {
255 Ok(solution) => solution,
256 Err(error) => bail!("[UnconfirmedSolution] {error}"),
257 };
258 if message.solution_id != solution.id() {
260 bail!("Peer '{peer_ip}' is not following the 'UnconfirmedSolution' protocol")
261 }
262 match self.unconfirmed_solution(peer_ip, serialized, solution).await {
264 true => Ok(()),
265 false => bail!("Peer '{peer_ip}' sent an invalid unconfirmed solution"),
266 }
267 }
268 Message::UnconfirmedTransaction(message) => {
269 if self.num_blocks_behind() > SYNC_LENIENCY {
271 trace!("Skipped processing unconfirmed transaction '{}' (node is syncing)", message.transaction_id);
272 return Ok(());
273 }
274 let seen_before =
276 self.router().cache.insert_inbound_transaction(peer_ip, message.transaction_id).is_some();
277 if seen_before {
279 trace!("Skipping 'UnconfirmedTransaction' from '{peer_ip}'");
280 return Ok(());
281 }
282 let serialized = message.clone();
284 let transaction = match message.transaction.deserialize().await {
286 Ok(transaction) => transaction,
287 Err(error) => bail!("[UnconfirmedTransaction] {error}"),
288 };
289 if message.transaction_id != transaction.id() {
291 bail!("Peer '{peer_ip}' is not following the 'UnconfirmedTransaction' protocol")
292 }
293 match self.unconfirmed_transaction(peer_ip, serialized, transaction).await {
295 true => Ok(()),
296 false => bail!("Peer '{peer_ip}' sent an invalid unconfirmed transaction"),
297 }
298 }
299 }
300 }
301
302 fn block_request(&self, peer_ip: SocketAddr, _message: BlockRequest) -> bool;
304
305 fn block_response(&self, peer_ip: SocketAddr, _blocks: Vec<Block<N>>) -> bool;
307
308 fn peer_request(&self, peer_ip: SocketAddr) -> bool {
310 let peers = self.router().connected_peers();
312 let peers = match self.router().is_dev() {
314 true => {
316 peers.into_iter().filter(|ip| *ip != peer_ip && !is_bogon_ip(ip.ip())).take(MAX_PEERS_TO_SEND).collect()
317 }
318 false => peers
320 .into_iter()
321 .filter(|ip| *ip != peer_ip && self.router().is_valid_peer_ip(ip))
322 .take(MAX_PEERS_TO_SEND)
323 .collect(),
324 };
325 self.send(peer_ip, Message::PeerResponse(PeerResponse { peers }));
327 true
328 }
329
330 fn peer_response(&self, _peer_ip: SocketAddr, peers: &[SocketAddr]) -> bool {
332 if peers.len() > MAX_PEERS_TO_SEND {
334 return false;
335 }
336 let peers = match self.router().is_dev() {
338 true => peers.iter().copied().filter(|ip| !is_bogon_ip(ip.ip())).collect::<Vec<_>>(),
340 false => peers.iter().copied().filter(|ip| self.router().is_valid_peer_ip(ip)).collect(),
342 };
343 self.router().insert_candidate_peers(&peers);
345 true
346 }
347
348 fn ping(&self, peer_ip: SocketAddr, message: Ping<N>) -> bool;
350
351 fn pong(&self, peer_ip: SocketAddr, _message: Pong) -> bool;
353
354 fn puzzle_request(&self, peer_ip: SocketAddr) -> bool;
356
357 fn puzzle_response(&self, peer_ip: SocketAddr, _epoch_hash: N::BlockHash, _header: Header<N>) -> bool;
359
360 async fn unconfirmed_solution(
362 &self,
363 peer_ip: SocketAddr,
364 serialized: UnconfirmedSolution<N>,
365 solution: Solution<N>,
366 ) -> bool;
367
368 async fn unconfirmed_transaction(
370 &self,
371 peer_ip: SocketAddr,
372 serialized: UnconfirmedTransaction<N>,
373 _transaction: Transaction<N>,
374 ) -> bool;
375}