1use crate::{
17 Outbound,
18 Peer,
19 messages::{
20 BlockRequest,
21 BlockResponse,
22 DataBlocks,
23 Message,
24 PeerResponse,
25 Ping,
26 Pong,
27 UnconfirmedSolution,
28 UnconfirmedTransaction,
29 },
30};
31use snarkos_node_tcp::protocols::Reading;
32use snarkvm::prelude::{
33 Network,
34 block::{Block, Header, Transaction},
35 puzzle::Solution,
36};
37
38use anyhow::{Result, anyhow, bail};
39use snarkos_node_tcp::is_bogon_ip;
40use std::net::SocketAddr;
41use tokio::task::spawn_blocking;
42
43const MAX_PEERS_TO_SEND: usize = u8::MAX as usize;
45
46#[async_trait]
47pub trait Inbound<N: Network>: Reading + Outbound<N> {
48 const MAXIMUM_PUZZLE_REQUESTS_PER_INTERVAL: usize = 5;
50 const MAXIMUM_BLOCK_REQUESTS_PER_INTERVAL: usize = 256;
52 const PING_SLEEP_IN_SECS: u64 = 20; const MESSAGE_LIMIT_TIME_FRAME_IN_SECS: i64 = 5;
56 const MESSAGE_LIMIT: usize = 500;
58
59 fn is_valid_message_version(&self, message_version: u32) -> bool;
61
62 fn is_within_sync_leniency(&self) -> bool {
64 const SYNC_LENIENCY: u32 = 10;
67
68 if let Some(num) = self.num_blocks_behind() {
69 num <= SYNC_LENIENCY
70 } else {
71 true
73 }
74 }
75
76 async fn inbound(&self, peer_addr: SocketAddr, message: Message<N>) -> Result<()> {
78 let peer_ip = match self.router().resolve_to_listener(&peer_addr) {
80 Some(peer_ip) => peer_ip,
81 None => bail!("Unable to resolve the (ambiguous) peer address '{peer_addr}'"),
82 };
83
84 let num_messages = self.router().cache.insert_inbound_message(peer_ip, Self::MESSAGE_LIMIT_TIME_FRAME_IN_SECS);
87 if num_messages > Self::MESSAGE_LIMIT {
88 bail!("Dropping '{peer_ip}' for spamming messages (num_messages = {num_messages})")
89 }
90
91 trace!("Received '{}' from '{peer_ip}'", message.name());
92
93 self.router().update_last_seen_for_connected_peer(peer_ip);
95
96 match message {
99 Message::BlockRequest(message) => {
100 let BlockRequest { start_height, end_height } = &message;
101 let frequency = self.router().cache.insert_inbound_block_request(peer_ip);
103 if frequency > Self::MAXIMUM_BLOCK_REQUESTS_PER_INTERVAL {
105 bail!("Peer '{peer_ip}' is not following the protocol (excessive block requests)")
106 }
107 if start_height >= end_height {
109 bail!("Block request from '{peer_ip}' has an invalid range ({start_height}..{end_height})")
110 }
111 if end_height - start_height > DataBlocks::<N>::MAXIMUM_NUMBER_OF_BLOCKS as u32 {
113 bail!("Block request from '{peer_ip}' has an excessive range ({start_height}..{end_height})")
114 }
115
116 let node = self.clone();
117 match spawn_blocking(move || node.block_request(peer_ip, message)).await? {
118 true => Ok(()),
119 false => bail!("Peer '{peer_ip}' sent an invalid block request"),
120 }
121 }
122 Message::BlockResponse(message) => {
123 let BlockResponse { request, blocks } = message;
124
125 if !self.router().cache.remove_outbound_block_request(peer_ip, &request) {
127 bail!("Peer '{peer_ip}' is not following the protocol (unexpected block response)")
128 }
129 let (send, recv) = tokio::sync::oneshot::channel();
133 rayon::spawn_fifo(move || {
134 let blocks = blocks.deserialize_blocking().map_err(|error| anyhow!("[BlockResponse] {error}"));
135 let _ = send.send(blocks);
136 });
137 let blocks = match recv.await {
138 Ok(Ok(blocks)) => blocks,
139 Ok(Err(error)) => bail!("Peer '{peer_ip}' sent an invalid block response - {error}"),
140 Err(error) => bail!("Peer '{peer_ip}' sent an invalid block response - {error}"),
141 };
142
143 blocks.ensure_response_is_well_formed(peer_ip, request.start_height, request.end_height)?;
145
146 let node = self.clone();
148 match spawn_blocking(move || node.block_response(peer_ip, blocks.0)).await? {
149 true => Ok(()),
150 false => bail!("Peer '{peer_ip}' sent an invalid block response"),
151 }
152 }
153 Message::ChallengeRequest(..) | Message::ChallengeResponse(..) => {
154 bail!("Peer '{peer_ip}' is not following the protocol")
156 }
157 Message::Disconnect(message) => {
158 bail!("{:?}", message.reason)
159 }
160 Message::PeerRequest(..) => match self.peer_request(peer_ip) {
161 true => Ok(()),
162 false => bail!("Peer '{peer_ip}' sent an invalid peer request"),
163 },
164 Message::PeerResponse(message) => {
165 if !self.router().cache.contains_outbound_peer_request(peer_ip) {
166 bail!("Peer '{peer_ip}' is not following the protocol (unexpected peer response)")
167 }
168 self.router().cache.decrement_outbound_peer_requests(peer_ip);
169 if !self.router().allow_external_peers() {
170 bail!("Not accepting peer response from '{peer_ip}' (validator gossip is disabled)");
171 }
172
173 match self.peer_response(peer_ip, &message.peers) {
174 true => Ok(()),
175 false => bail!("Peer '{peer_ip}' sent an invalid peer response"),
176 }
177 }
178 Message::Ping(message) => {
179 if !self.is_valid_message_version(message.version) {
181 bail!("Dropping '{peer_ip}' on message version {} (outdated)", message.version);
182 }
183
184 let is_client_or_validator = message.node_type.is_client() || message.node_type.is_validator();
186 if is_client_or_validator && message.block_locators.is_none() {
187 bail!("Peer '{peer_ip}' is a {}, but no block locators were provided", message.node_type);
188 }
189 else if message.node_type.is_prover() && message.block_locators.is_some() {
191 bail!("Peer '{peer_ip}' is a prover or client, but block locators were provided");
192 }
193
194 if let Err(error) =
196 self.router().update_connected_peer(peer_ip, message.node_type, |peer: &mut Peer<N>| {
197 peer.set_version(message.version);
199 peer.set_node_type(message.node_type);
201 })
202 {
203 bail!("[Ping] {error}");
204 }
205
206 match self.ping(peer_ip, message) {
208 true => Ok(()),
209 false => bail!("Peer '{peer_ip}' sent an invalid ping"),
210 }
211 }
212 Message::Pong(message) => match self.pong(peer_ip, message) {
213 true => Ok(()),
214 false => bail!("Peer '{peer_ip}' sent an invalid pong"),
215 },
216 Message::PuzzleRequest(..) => {
217 let frequency = self.router().cache.insert_inbound_puzzle_request(peer_ip);
219 if frequency > Self::MAXIMUM_PUZZLE_REQUESTS_PER_INTERVAL {
221 bail!("Peer '{peer_ip}' is not following the protocol (excessive puzzle requests)")
222 }
223 match self.puzzle_request(peer_ip) {
225 true => Ok(()),
226 false => bail!("Peer '{peer_ip}' sent an invalid puzzle request"),
227 }
228 }
229 Message::PuzzleResponse(message) => {
230 if !self.router().cache.contains_outbound_puzzle_request(&peer_ip) {
232 bail!("Peer '{peer_ip}' is not following the protocol (unexpected puzzle response)")
233 }
234 self.router().cache.decrement_outbound_puzzle_requests(peer_ip);
236
237 let header = match message.block_header.deserialize().await {
239 Ok(header) => header,
240 Err(error) => bail!("[PuzzleResponse] {error}"),
241 };
242 match self.puzzle_response(peer_ip, message.epoch_hash, header) {
244 true => Ok(()),
245 false => bail!("Peer '{peer_ip}' sent an invalid puzzle response"),
246 }
247 }
248 Message::UnconfirmedSolution(message) => {
249 if !self.is_within_sync_leniency() {
251 trace!("Skipped processing unconfirmed solution '{}' (node is syncing)", message.solution_id);
252 return Ok(());
253 }
254
255 let seen_before = self.router().cache.insert_inbound_solution(peer_ip, message.solution_id).is_some();
257 if seen_before {
259 trace!("Skipping 'UnconfirmedSolution' from '{peer_ip}'");
260 return Ok(());
261 }
262 let serialized = message.clone();
264 let solution = match message.solution.deserialize().await {
266 Ok(solution) => solution,
267 Err(error) => bail!("[UnconfirmedSolution] {error}"),
268 };
269 if message.solution_id != solution.id() {
271 bail!("Peer '{peer_ip}' is not following the 'UnconfirmedSolution' protocol")
272 }
273 match self.unconfirmed_solution(peer_ip, serialized, solution).await {
275 true => Ok(()),
276 false => bail!("Peer '{peer_ip}' sent an invalid unconfirmed solution"),
277 }
278 }
279 Message::UnconfirmedTransaction(message) => {
280 if !self.is_within_sync_leniency() {
282 trace!("Skipped processing unconfirmed transaction '{}' (node is syncing)", message.transaction_id);
283 return Ok(());
284 }
285 let seen_before =
287 self.router().cache.insert_inbound_transaction(peer_ip, message.transaction_id).is_some();
288 if seen_before {
290 trace!("Skipping 'UnconfirmedTransaction' from '{peer_ip}'");
291 return Ok(());
292 }
293 let serialized = message.clone();
295 let transaction = match message.transaction.deserialize().await {
297 Ok(transaction) => transaction,
298 Err(error) => bail!("[UnconfirmedTransaction] {error}"),
299 };
300 if message.transaction_id != transaction.id() {
302 bail!("Peer '{peer_ip}' is not following the 'UnconfirmedTransaction' protocol")
303 }
304 match self.unconfirmed_transaction(peer_ip, serialized, transaction).await {
306 true => Ok(()),
307 false => bail!("Peer '{peer_ip}' sent an invalid unconfirmed transaction"),
308 }
309 }
310 }
311 }
312
313 fn block_request(&self, peer_ip: SocketAddr, _message: BlockRequest) -> bool;
315
316 fn block_response(&self, peer_ip: SocketAddr, _blocks: Vec<Block<N>>) -> bool;
318
319 fn peer_request(&self, peer_ip: SocketAddr) -> bool {
321 let peers = self.router().connected_peers();
323 let peers = match self.router().is_dev() {
325 true => {
327 peers.into_iter().filter(|ip| *ip != peer_ip && !is_bogon_ip(ip.ip())).take(MAX_PEERS_TO_SEND).collect()
328 }
329 false => peers
331 .into_iter()
332 .filter(|ip| *ip != peer_ip && self.router().is_valid_peer_ip(ip))
333 .take(MAX_PEERS_TO_SEND)
334 .collect(),
335 };
336 self.router().send(peer_ip, Message::PeerResponse(PeerResponse { peers }));
338 true
339 }
340
341 fn peer_response(&self, _peer_ip: SocketAddr, peers: &[SocketAddr]) -> bool {
343 if peers.len() > MAX_PEERS_TO_SEND {
345 return false;
346 }
347 let peers = match self.router().is_dev() {
349 true => peers.iter().copied().filter(|ip| !is_bogon_ip(ip.ip())).collect::<Vec<_>>(),
351 false => peers.iter().copied().filter(|ip| self.router().is_valid_peer_ip(ip)).collect(),
353 };
354 self.router().insert_candidate_peers(&peers);
356 true
357 }
358
359 fn ping(&self, peer_ip: SocketAddr, message: Ping<N>) -> bool;
361
362 fn pong(&self, peer_ip: SocketAddr, _message: Pong) -> bool;
364
365 fn puzzle_request(&self, peer_ip: SocketAddr) -> bool;
367
368 fn puzzle_response(&self, peer_ip: SocketAddr, _epoch_hash: N::BlockHash, _header: Header<N>) -> bool;
370
371 async fn unconfirmed_solution(
373 &self,
374 peer_ip: SocketAddr,
375 serialized: UnconfirmedSolution<N>,
376 solution: Solution<N>,
377 ) -> bool;
378
379 async fn unconfirmed_transaction(
381 &self,
382 peer_ip: SocketAddr,
383 serialized: UnconfirmedTransaction<N>,
384 _transaction: Transaction<N>,
385 ) -> bool;
386}