1use crate::{
17 ConnectionMode,
18 NodeType,
19 PeerPoolHandling,
20 Router,
21 messages::{ChallengeRequest, ChallengeResponse, DisconnectReason, Message, MessageCodec, MessageTrait},
22};
23use snarkos_node_network::{get_repo_commit_hash, log_repo_sha_comparison};
24use snarkos_node_tcp::{ConnectionSide, P2P, Tcp};
25use snarkvm::{
26 ledger::narwhal::Data,
27 prelude::{Address, ConsensusVersion, Field, Network, block::Header, error, io_error},
28};
29
30use anyhow::{Result, bail};
31use futures::SinkExt;
32use rand::{Rng, rngs::OsRng};
33use std::{io, net::SocketAddr};
34use tokio::net::TcpStream;
35use tokio_stream::StreamExt;
36use tokio_util::codec::Framed;
37
38impl<N: Network> P2P for Router<N> {
39 fn tcp(&self) -> &Tcp {
41 &self.tcp
42 }
43}
44
45#[macro_export]
47macro_rules! expect_message {
48 ($msg_ty:path, $framed:expr, $peer_addr:expr) => {{
49 use snarkvm::utilities::io_error;
50
51 match $framed.try_next().await? {
52 Some($msg_ty(data)) => {
54 trace!("Received '{}' from '{}'", data.name(), $peer_addr);
55 data
56 }
57 Some(Message::Disconnect($crate::messages::Disconnect { reason })) => {
59 return Err(io_error(format!("'{}' disconnected: {reason}", $peer_addr)));
60 }
61 Some(ty) => {
63 return Err(io_error(format!(
64 "'{}' did not follow the handshake protocol: received {:?} instead of {}",
65 $peer_addr,
66 ty.name(),
67 stringify!($msg_ty),
68 )));
69 }
70 None => {
72 return Err(io_error(format!(
73 "the peer disconnected before sending {:?}, likely due to peer saturation or shutdown",
74 stringify!($msg_ty),
75 )));
76 }
77 }
78 }};
79}
80
81async fn send<N: Network>(
83 framed: &mut Framed<&mut TcpStream, MessageCodec<N>>,
84 peer_addr: SocketAddr,
85 message: Message<N>,
86) -> io::Result<()> {
87 trace!("Sending '{}' to '{peer_addr}'", message.name());
88 framed.send(message).await
89}
90
91impl<N: Network> Router<N> {
92 pub async fn handshake<'a>(
94 &'a self,
95 peer_addr: SocketAddr,
96 stream: &'a mut TcpStream,
97 peer_side: ConnectionSide,
98 genesis_header: Header<N>,
99 restrictions_id: Field<N>,
100 ) -> io::Result<Option<ChallengeRequest<N>>> {
101 let mut listener_addr = if peer_side == ConnectionSide::Initiator {
104 debug!("Received a connection request from '{peer_addr}'");
105 None
106 } else {
107 debug!("Shaking hands with '{peer_addr}'...");
108 Some(peer_addr)
109 };
110
111 #[cfg(not(feature = "test"))]
113 if !self.is_dev() && peer_side == ConnectionSide::Initiator {
114 if self.is_ip_banned(peer_addr.ip()) {
116 trace!("Rejected a connection request from banned IP '{}'", peer_addr.ip());
117 return Err(error(format!("'{}' is a banned IP address", peer_addr.ip())));
118 }
119
120 let num_attempts =
121 self.cache.insert_inbound_connection(peer_addr.ip(), Router::<N>::CONNECTION_ATTEMPTS_SINCE_SECS);
122
123 debug!("Number of connection attempts from '{}': {}", peer_addr.ip(), num_attempts);
124 if num_attempts > Router::<N>::MAX_CONNECTION_ATTEMPTS {
125 self.update_ip_ban(peer_addr.ip());
126 trace!("Rejected a consecutive connection request from IP '{}'", peer_addr.ip());
127 return Err(error(format!("'{}' appears to be spamming connections", peer_addr.ip())));
128 }
129 }
130
131 let handshake_result = if peer_side == ConnectionSide::Responder {
133 self.handshake_inner_initiator(peer_addr, stream, genesis_header, restrictions_id).await
134 } else {
135 self.handshake_inner_responder(peer_addr, &mut listener_addr, stream, genesis_header, restrictions_id).await
136 };
137
138 if let Some(addr) = listener_addr {
139 match handshake_result {
140 Ok(Some(ref cr)) => {
141 if let Some(peer) = self.peer_pool.write().get_mut(&addr) {
142 self.resolver.write().insert_peer(peer.listener_addr(), peer_addr, Some(cr.address));
143 peer.upgrade_to_connected(
144 peer_addr,
145 cr.listener_port,
146 cr.address,
147 cr.node_type,
148 cr.version,
149 ConnectionMode::Router,
150 );
151 }
152 #[cfg(feature = "metrics")]
153 self.update_metrics();
154 debug!("Completed the handshake with '{peer_addr}'");
155 }
156 Ok(None) => {
157 return Err(error(format!("Duplicate handshake attempt with '{addr}'")));
158 }
159 Err(_) => {
160 if let Some(peer) = self.peer_pool.write().get_mut(&addr) {
161 if peer.is_connecting() {
163 peer.downgrade_to_candidate(addr);
164 }
165 }
166 }
167 }
168 }
169
170 handshake_result
171 }
172
173 async fn handshake_inner_initiator<'a>(
175 &'a self,
176 peer_addr: SocketAddr,
177 stream: &'a mut TcpStream,
178 genesis_header: Header<N>,
179 restrictions_id: Field<N>,
180 ) -> io::Result<Option<ChallengeRequest<N>>> {
181 if !self.add_connecting_peer(peer_addr) {
183 return Ok(None);
185 }
186
187 let mut framed = Framed::new(stream, MessageCodec::<N>::handshake());
189
190 let rng = &mut OsRng;
192
193 let current_block_height = self.ledger.latest_block_height();
195 let consensus_version = N::CONSENSUS_VERSION(current_block_height).unwrap();
196 let snarkos_sha = match (consensus_version >= ConsensusVersion::V12, get_repo_commit_hash()) {
197 (true, Some(sha)) => Some(sha),
198 _ => None,
199 };
200
201 let our_nonce = rng.r#gen();
205 let our_request =
207 ChallengeRequest::new(self.local_ip().port(), self.node_type, self.address(), our_nonce, snarkos_sha);
208 send(&mut framed, peer_addr, Message::ChallengeRequest(our_request)).await?;
209
210 let peer_response = expect_message!(Message::ChallengeResponse, framed, peer_addr);
214 let peer_request = expect_message!(Message::ChallengeRequest, framed, peer_addr);
216
217 if let Some(reason) = self
219 .verify_challenge_response(
220 peer_addr,
221 peer_request.address,
222 peer_request.node_type,
223 peer_response,
224 genesis_header,
225 restrictions_id,
226 our_nonce,
227 )
228 .await
229 {
230 send(&mut framed, peer_addr, reason.into()).await?;
231 return Err(io_error(format!("Dropped '{peer_addr}' for reason: {reason}")));
232 }
233 if let Some(reason) = self.verify_challenge_request(peer_addr, &peer_request) {
235 send(&mut framed, peer_addr, reason.into()).await?;
236 return Err(io_error(format!("Dropped '{peer_addr}' for reason: {reason}")));
237 }
238
239 let response_nonce: u64 = rng.r#gen();
242 let data = [peer_request.nonce.to_le_bytes(), response_nonce.to_le_bytes()].concat();
243 let Ok(our_signature) = self.account.sign_bytes(&data, rng) else {
245 return Err(error(format!("Failed to sign the challenge request nonce from '{peer_addr}'")));
246 };
247 let our_response = ChallengeResponse {
249 genesis_header,
250 restrictions_id,
251 signature: Data::Object(our_signature),
252 nonce: response_nonce,
253 };
254 send(&mut framed, peer_addr, Message::ChallengeResponse(our_response)).await?;
255
256 Ok(Some(peer_request))
257 }
258
259 async fn handshake_inner_responder<'a>(
261 &'a self,
262 peer_addr: SocketAddr,
263 listener_addr: &mut Option<SocketAddr>,
264 stream: &'a mut TcpStream,
265 genesis_header: Header<N>,
266 restrictions_id: Field<N>,
267 ) -> io::Result<Option<ChallengeRequest<N>>> {
268 let mut framed = Framed::new(stream, MessageCodec::<N>::handshake());
270
271 let peer_request = expect_message!(Message::ChallengeRequest, framed, peer_addr);
275
276 let current_block_height = self.ledger.latest_block_height();
278 let consensus_version = N::CONSENSUS_VERSION(current_block_height).unwrap();
279 let snarkos_sha = match (consensus_version >= ConsensusVersion::V12, get_repo_commit_hash()) {
280 (true, Some(sha)) => Some(sha),
281 _ => None,
282 };
283
284 *listener_addr = Some(SocketAddr::new(peer_addr.ip(), peer_request.listener_port));
286 let listener_addr = listener_addr.unwrap();
287
288 if let Err(forbidden_message) = self.ensure_peer_is_allowed(listener_addr) {
290 return Err(error(format!("{forbidden_message}")));
291 }
292
293 if !self.add_connecting_peer(listener_addr) {
295 return Ok(None);
297 }
298
299 if let Some(reason) = self.verify_challenge_request(peer_addr, &peer_request) {
301 send(&mut framed, peer_addr, reason.into()).await?;
302 return Err(io_error(format!("Dropped '{peer_addr}' for reason: {reason}")));
303 }
304
305 let rng = &mut OsRng;
309
310 let response_nonce: u64 = rng.r#gen();
312 let data = [peer_request.nonce.to_le_bytes(), response_nonce.to_le_bytes()].concat();
313 let Ok(our_signature) = self.account.sign_bytes(&data, rng) else {
314 return Err(error(format!("Failed to sign the challenge request nonce from '{peer_addr}'")));
315 };
316 let our_response = ChallengeResponse {
318 genesis_header,
319 restrictions_id,
320 signature: Data::Object(our_signature),
321 nonce: response_nonce,
322 };
323 send(&mut framed, peer_addr, Message::ChallengeResponse(our_response)).await?;
324
325 let our_nonce = rng.r#gen();
327 let our_request =
329 ChallengeRequest::new(self.local_ip().port(), self.node_type, self.address(), our_nonce, snarkos_sha);
330 send(&mut framed, peer_addr, Message::ChallengeRequest(our_request)).await?;
331
332 let peer_response = expect_message!(Message::ChallengeResponse, framed, peer_addr);
336 if let Some(reason) = self
338 .verify_challenge_response(
339 peer_addr,
340 peer_request.address,
341 peer_request.node_type,
342 peer_response,
343 genesis_header,
344 restrictions_id,
345 our_nonce,
346 )
347 .await
348 {
349 send(&mut framed, peer_addr, reason.into()).await?;
350 return Err(io_error(format!("Dropped '{peer_addr}' for reason: {reason}")));
351 }
352
353 Ok(Some(peer_request))
354 }
355
356 fn ensure_peer_is_allowed(&self, listener_addr: SocketAddr) -> Result<()> {
358 if self.is_local_ip(listener_addr) {
360 bail!("Dropping connection request from '{listener_addr}' (attempted to self-connect)");
361 }
362 if self.node_type() == NodeType::Validator
364 && !self.is_trusted(listener_addr)
365 && !crate::bootstrap_peers::<N>(self.is_dev()).contains(&listener_addr)
366 {
367 bail!("Dropping connection request from '{listener_addr}' (untrusted)");
368 }
369 if self.trusted_peers_only() && !self.is_trusted(listener_addr) {
371 bail!("Dropping connection request from '{listener_addr}' (untrusted)");
372 }
373 Ok(())
374 }
375
376 fn verify_challenge_request(
378 &self,
379 peer_addr: SocketAddr,
380 message: &ChallengeRequest<N>,
381 ) -> Option<DisconnectReason> {
382 let &ChallengeRequest { version, listener_port: _, node_type, address, nonce: _, ref snarkos_sha } = message;
384 log_repo_sha_comparison(peer_addr, snarkos_sha, Self::OWNER);
385
386 if !self.is_valid_message_version(version) {
388 warn!("Dropping '{peer_addr}' on version {version} (outdated)");
389 return Some(DisconnectReason::OutdatedClientVersion);
390 }
391
392 if self.node_type() == NodeType::Validator
394 && node_type == NodeType::Validator
395 && self.is_connected_address(address)
396 {
397 warn!("Dropping '{peer_addr}' for being already connected ({address})");
398 return Some(DisconnectReason::NoReasonGiven);
399 }
400
401 None
402 }
403
404 #[allow(clippy::too_many_arguments)]
406 async fn verify_challenge_response(
407 &self,
408 peer_addr: SocketAddr,
409 peer_address: Address<N>,
410 peer_node_type: NodeType,
411 response: ChallengeResponse<N>,
412 expected_genesis_header: Header<N>,
413 expected_restrictions_id: Field<N>,
414 expected_nonce: u64,
415 ) -> Option<DisconnectReason> {
416 let ChallengeResponse { genesis_header, restrictions_id, signature, nonce } = response;
418
419 if genesis_header != expected_genesis_header {
421 warn!("Handshake with '{peer_addr}' failed (incorrect block header)");
422 return Some(DisconnectReason::InvalidChallengeResponse);
423 }
424 if !peer_node_type.is_prover() && !self.node_type.is_prover() && restrictions_id != expected_restrictions_id {
426 warn!("Handshake with '{peer_addr}' failed (incorrect restrictions ID)");
427 return Some(DisconnectReason::InvalidChallengeResponse);
428 }
429 let Ok(signature) = signature.deserialize().await else {
431 warn!("Handshake with '{peer_addr}' failed (cannot deserialize the signature)");
432 return Some(DisconnectReason::InvalidChallengeResponse);
433 };
434 if !signature.verify_bytes(&peer_address, &[expected_nonce.to_le_bytes(), nonce.to_le_bytes()].concat()) {
436 warn!("Handshake with '{peer_addr}' failed (invalid signature)");
437 return Some(DisconnectReason::InvalidChallengeResponse);
438 }
439 None
440 }
441}