1use crate::{
17 Outbound,
18 Peer,
19 messages::{
20 BlockRequest,
21 BlockResponse,
22 DataBlocks,
23 Message,
24 PeerResponse,
25 Ping,
26 Pong,
27 UnconfirmedSolution,
28 UnconfirmedTransaction,
29 },
30};
31use snarkos_node_tcp::protocols::Reading;
32use snarkvm::prelude::{
33 Network,
34 block::{Block, Header, Transaction},
35 puzzle::Solution,
36};
37
38use anyhow::{Result, anyhow, bail};
39use snarkos_node_tcp::is_bogon_ip;
40use std::net::SocketAddr;
41use tokio::task::spawn_blocking;
42
43const MAX_PEERS_TO_SEND: usize = u8::MAX as usize;
45
46pub const SYNC_LENIENCY: u32 = 10;
49
50#[async_trait]
51pub trait Inbound<N: Network>: Reading + Outbound<N> {
52 const MAXIMUM_PUZZLE_REQUESTS_PER_INTERVAL: usize = 5;
54 const MAXIMUM_BLOCK_REQUESTS_PER_INTERVAL: usize = 256;
56 const PING_SLEEP_IN_SECS: u64 = 20; const MESSAGE_LIMIT_TIME_FRAME_IN_SECS: i64 = 5;
60 const MESSAGE_LIMIT: usize = 500;
62
63 async fn inbound(&self, peer_addr: SocketAddr, message: Message<N>) -> Result<()> {
65 let peer_ip = match self.router().resolve_to_listener(&peer_addr) {
67 Some(peer_ip) => peer_ip,
68 None => bail!("Unable to resolve the (ambiguous) peer address '{peer_addr}'"),
69 };
70
71 let num_messages = self.router().cache.insert_inbound_message(peer_ip, Self::MESSAGE_LIMIT_TIME_FRAME_IN_SECS);
74 if num_messages > Self::MESSAGE_LIMIT {
75 bail!("Dropping '{peer_ip}' for spamming messages (num_messages = {num_messages})")
76 }
77
78 trace!("Received '{}' from '{peer_ip}'", message.name());
79
80 self.router().update_last_seen_for_connected_peer(peer_ip);
82
83 match message {
86 Message::BlockRequest(message) => {
87 let BlockRequest { start_height, end_height } = &message;
88 let frequency = self.router().cache.insert_inbound_block_request(peer_ip);
90 if frequency > Self::MAXIMUM_BLOCK_REQUESTS_PER_INTERVAL {
92 bail!("Peer '{peer_ip}' is not following the protocol (excessive block requests)")
93 }
94 if start_height >= end_height {
96 bail!("Block request from '{peer_ip}' has an invalid range ({start_height}..{end_height})")
97 }
98 if end_height - start_height > DataBlocks::<N>::MAXIMUM_NUMBER_OF_BLOCKS as u32 {
100 bail!("Block request from '{peer_ip}' has an excessive range ({start_height}..{end_height})")
101 }
102
103 let node = self.clone();
104 match spawn_blocking(move || node.block_request(peer_ip, message)).await? {
105 true => Ok(()),
106 false => bail!("Peer '{peer_ip}' sent an invalid block request"),
107 }
108 }
109 Message::BlockResponse(message) => {
110 let BlockResponse { request, blocks } = message;
111
112 if !self.router().cache.remove_outbound_block_request(peer_ip, &request) {
114 bail!("Peer '{peer_ip}' is not following the protocol (unexpected block response)")
115 }
116 let (send, recv) = tokio::sync::oneshot::channel();
120 rayon::spawn_fifo(move || {
121 let blocks = blocks.deserialize_blocking().map_err(|error| anyhow!("[BlockResponse] {error}"));
122 let _ = send.send(blocks);
123 });
124 let blocks = match recv.await {
125 Ok(Ok(blocks)) => blocks,
126 Ok(Err(error)) => bail!("Peer '{peer_ip}' sent an invalid block response - {error}"),
127 Err(error) => bail!("Peer '{peer_ip}' sent an invalid block response - {error}"),
128 };
129
130 blocks.ensure_response_is_well_formed(peer_ip, request.start_height, request.end_height)?;
132
133 let node = self.clone();
135 match spawn_blocking(move || node.block_response(peer_ip, blocks.0)).await? {
136 true => Ok(()),
137 false => bail!("Peer '{peer_ip}' sent an invalid block response"),
138 }
139 }
140 Message::ChallengeRequest(..) | Message::ChallengeResponse(..) => {
141 bail!("Peer '{peer_ip}' is not following the protocol")
143 }
144 Message::Disconnect(message) => {
145 bail!("{:?}", message.reason)
146 }
147 Message::PeerRequest(..) => match self.peer_request(peer_ip) {
148 true => Ok(()),
149 false => bail!("Peer '{peer_ip}' sent an invalid peer request"),
150 },
151 Message::PeerResponse(message) => {
152 if !self.router().cache.contains_outbound_peer_request(peer_ip) {
153 bail!("Peer '{peer_ip}' is not following the protocol (unexpected peer response)")
154 }
155 self.router().cache.decrement_outbound_peer_requests(peer_ip);
156 if !self.router().allow_external_peers() {
157 bail!("Not accepting peer response from '{peer_ip}' (validator gossip is disabled)");
158 }
159
160 match self.peer_response(peer_ip, &message.peers) {
161 true => Ok(()),
162 false => bail!("Peer '{peer_ip}' sent an invalid peer response"),
163 }
164 }
165 Message::Ping(message) => {
166 if message.version < Message::<N>::VERSION {
168 bail!("Dropping '{peer_ip}' on message version {} (outdated)", message.version);
169 }
170
171 let is_client_or_validator = message.node_type.is_client() || message.node_type.is_validator();
173 if is_client_or_validator && message.block_locators.is_none() {
174 bail!("Peer '{peer_ip}' is a {}, but no block locators were provided", message.node_type);
175 }
176 else if message.node_type.is_prover() && message.block_locators.is_some() {
178 bail!("Peer '{peer_ip}' is a prover or client, but block locators were provided");
179 }
180
181 if let Err(error) =
183 self.router().update_connected_peer(peer_ip, message.node_type, |peer: &mut Peer<N>| {
184 peer.set_version(message.version);
186 peer.set_node_type(message.node_type);
188 })
189 {
190 bail!("[Ping] {error}");
191 }
192
193 match self.ping(peer_ip, message) {
195 true => Ok(()),
196 false => bail!("Peer '{peer_ip}' sent an invalid ping"),
197 }
198 }
199 Message::Pong(message) => match self.pong(peer_ip, message) {
200 true => Ok(()),
201 false => bail!("Peer '{peer_ip}' sent an invalid pong"),
202 },
203 Message::PuzzleRequest(..) => {
204 let frequency = self.router().cache.insert_inbound_puzzle_request(peer_ip);
206 if frequency > Self::MAXIMUM_PUZZLE_REQUESTS_PER_INTERVAL {
208 bail!("Peer '{peer_ip}' is not following the protocol (excessive puzzle requests)")
209 }
210 match self.puzzle_request(peer_ip) {
212 true => Ok(()),
213 false => bail!("Peer '{peer_ip}' sent an invalid puzzle request"),
214 }
215 }
216 Message::PuzzleResponse(message) => {
217 if !self.router().cache.contains_outbound_puzzle_request(&peer_ip) {
219 bail!("Peer '{peer_ip}' is not following the protocol (unexpected puzzle response)")
220 }
221 self.router().cache.decrement_outbound_puzzle_requests(peer_ip);
223
224 let header = match message.block_header.deserialize().await {
226 Ok(header) => header,
227 Err(error) => bail!("[PuzzleResponse] {error}"),
228 };
229 match self.puzzle_response(peer_ip, message.epoch_hash, header) {
231 true => Ok(()),
232 false => bail!("Peer '{peer_ip}' sent an invalid puzzle response"),
233 }
234 }
235 Message::UnconfirmedSolution(message) => {
236 if self.num_blocks_behind() > SYNC_LENIENCY {
238 trace!("Skipped processing unconfirmed solution '{}' (node is syncing)", message.solution_id);
239 return Ok(());
240 }
241 let seen_before = self.router().cache.insert_inbound_solution(peer_ip, message.solution_id).is_some();
243 if seen_before {
245 trace!("Skipping 'UnconfirmedSolution' from '{peer_ip}'");
246 return Ok(());
247 }
248 let serialized = message.clone();
250 let solution = match message.solution.deserialize().await {
252 Ok(solution) => solution,
253 Err(error) => bail!("[UnconfirmedSolution] {error}"),
254 };
255 if message.solution_id != solution.id() {
257 bail!("Peer '{peer_ip}' is not following the 'UnconfirmedSolution' protocol")
258 }
259 match self.unconfirmed_solution(peer_ip, serialized, solution).await {
261 true => Ok(()),
262 false => bail!("Peer '{peer_ip}' sent an invalid unconfirmed solution"),
263 }
264 }
265 Message::UnconfirmedTransaction(message) => {
266 if self.num_blocks_behind() > SYNC_LENIENCY {
268 trace!("Skipped processing unconfirmed transaction '{}' (node is syncing)", message.transaction_id);
269 return Ok(());
270 }
271 let seen_before =
273 self.router().cache.insert_inbound_transaction(peer_ip, message.transaction_id).is_some();
274 if seen_before {
276 trace!("Skipping 'UnconfirmedTransaction' from '{peer_ip}'");
277 return Ok(());
278 }
279 let serialized = message.clone();
281 let transaction = match message.transaction.deserialize().await {
283 Ok(transaction) => transaction,
284 Err(error) => bail!("[UnconfirmedTransaction] {error}"),
285 };
286 if message.transaction_id != transaction.id() {
288 bail!("Peer '{peer_ip}' is not following the 'UnconfirmedTransaction' protocol")
289 }
290 match self.unconfirmed_transaction(peer_ip, serialized, transaction).await {
292 true => Ok(()),
293 false => bail!("Peer '{peer_ip}' sent an invalid unconfirmed transaction"),
294 }
295 }
296 }
297 }
298
299 fn block_request(&self, peer_ip: SocketAddr, _message: BlockRequest) -> bool;
301
302 fn block_response(&self, peer_ip: SocketAddr, _blocks: Vec<Block<N>>) -> bool;
304
305 fn peer_request(&self, peer_ip: SocketAddr) -> bool {
307 let peers = self.router().connected_peers();
309 let peers = match self.router().is_dev() {
311 true => {
313 peers.into_iter().filter(|ip| *ip != peer_ip && !is_bogon_ip(ip.ip())).take(MAX_PEERS_TO_SEND).collect()
314 }
315 false => peers
317 .into_iter()
318 .filter(|ip| *ip != peer_ip && self.router().is_valid_peer_ip(ip))
319 .take(MAX_PEERS_TO_SEND)
320 .collect(),
321 };
322 self.send(peer_ip, Message::PeerResponse(PeerResponse { peers }));
324 true
325 }
326
327 fn peer_response(&self, _peer_ip: SocketAddr, peers: &[SocketAddr]) -> bool {
329 if peers.len() > MAX_PEERS_TO_SEND {
331 return false;
332 }
333 let peers = match self.router().is_dev() {
335 true => peers.iter().copied().filter(|ip| !is_bogon_ip(ip.ip())).collect::<Vec<_>>(),
337 false => peers.iter().copied().filter(|ip| self.router().is_valid_peer_ip(ip)).collect(),
339 };
340 self.router().insert_candidate_peers(&peers);
342 true
343 }
344
345 fn ping(&self, peer_ip: SocketAddr, message: Ping<N>) -> bool;
347
348 fn pong(&self, peer_ip: SocketAddr, _message: Pong) -> bool;
350
351 fn puzzle_request(&self, peer_ip: SocketAddr) -> bool;
353
354 fn puzzle_response(&self, peer_ip: SocketAddr, _epoch_hash: N::BlockHash, _header: Header<N>) -> bool;
356
357 async fn unconfirmed_solution(
359 &self,
360 peer_ip: SocketAddr,
361 serialized: UnconfirmedSolution<N>,
362 solution: Solution<N>,
363 ) -> bool;
364
365 async fn unconfirmed_transaction(
367 &self,
368 peer_ip: SocketAddr,
369 serialized: UnconfirmedTransaction<N>,
370 _transaction: Transaction<N>,
371 ) -> bool;
372}