1use crate::{
17 ConnectionMode,
18 NodeType,
19 PeerPoolHandling,
20 Router,
21 messages::{ChallengeRequest, ChallengeResponse, DisconnectReason, Message, MessageCodec, MessageTrait},
22};
23use snarkos_node_network::{built_info, log_repo_sha_comparison};
24use snarkos_node_tcp::{ConnectionSide, P2P, Tcp};
25use snarkvm::{
26 ledger::narwhal::Data,
27 prelude::{Address, ConsensusVersion, Field, Network, block::Header, error},
28};
29
30use anyhow::{Result, bail};
31use futures::SinkExt;
32use rand::{Rng, rngs::OsRng};
33use std::{io, net::SocketAddr};
34use tokio::net::TcpStream;
35use tokio_stream::StreamExt;
36use tokio_util::codec::Framed;
37
38impl<N: Network> P2P for Router<N> {
39 fn tcp(&self) -> &Tcp {
41 &self.tcp
42 }
43}
44
45#[macro_export]
47macro_rules! expect_message {
48 ($msg_ty:path, $framed:expr, $peer_addr:expr) => {
49 match $framed.try_next().await? {
50 Some($msg_ty(data)) => {
52 trace!("Received '{}' from '{}'", data.name(), $peer_addr);
53 data
54 }
55 Some(Message::Disconnect(reason)) => {
57 return Err(error(format!("'{}' disconnected: {reason:?}", $peer_addr)))
58 }
59 Some(ty) => {
61 return Err(error(format!(
62 "'{}' did not follow the handshake protocol: received {:?} instead of {}",
63 $peer_addr,
64 ty.name(),
65 stringify!($msg_ty),
66 )))
67 }
68 None => {
70 return Err(error(format!(
71 "the peer disconnected before sending {:?}, likely due to peer saturation or shutdown",
72 stringify!($msg_ty),
73 )))
74 }
75 }
76 };
77}
78
79async fn send<N: Network>(
81 framed: &mut Framed<&mut TcpStream, MessageCodec<N>>,
82 peer_addr: SocketAddr,
83 message: Message<N>,
84) -> io::Result<()> {
85 trace!("Sending '{}' to '{peer_addr}'", message.name());
86 framed.send(message).await
87}
88
89impl<N: Network> Router<N> {
90 pub async fn handshake<'a>(
92 &'a self,
93 peer_addr: SocketAddr,
94 stream: &'a mut TcpStream,
95 peer_side: ConnectionSide,
96 genesis_header: Header<N>,
97 restrictions_id: Field<N>,
98 ) -> io::Result<Option<ChallengeRequest<N>>> {
99 let mut listener_addr = if peer_side == ConnectionSide::Initiator {
102 debug!("Received a connection request from '{peer_addr}'");
103 None
104 } else {
105 debug!("Shaking hands with '{peer_addr}'...");
106 Some(peer_addr)
107 };
108
109 #[cfg(not(feature = "test"))]
111 if !self.is_dev() && peer_side == ConnectionSide::Initiator {
112 if self.is_ip_banned(peer_addr.ip()) {
114 trace!("Rejected a connection request from banned IP '{}'", peer_addr.ip());
115 return Err(error(format!("'{}' is a banned IP address", peer_addr.ip())));
116 }
117
118 let num_attempts =
119 self.cache.insert_inbound_connection(peer_addr.ip(), Router::<N>::CONNECTION_ATTEMPTS_SINCE_SECS);
120
121 debug!("Number of connection attempts from '{}': {}", peer_addr.ip(), num_attempts);
122 if num_attempts > Router::<N>::MAX_CONNECTION_ATTEMPTS {
123 self.update_ip_ban(peer_addr.ip());
124 trace!("Rejected a consecutive connection request from IP '{}'", peer_addr.ip());
125 return Err(error(format!("'{}' appears to be spamming connections", peer_addr.ip())));
126 }
127 }
128
129 let handshake_result = if peer_side == ConnectionSide::Responder {
131 self.handshake_inner_initiator(peer_addr, stream, genesis_header, restrictions_id).await
132 } else {
133 self.handshake_inner_responder(peer_addr, &mut listener_addr, stream, genesis_header, restrictions_id).await
134 };
135
136 if let Some(addr) = listener_addr {
137 match handshake_result {
138 Ok(Some(ref cr)) => {
139 if let Some(peer) = self.peer_pool.write().get_mut(&addr) {
140 self.resolver.write().insert_peer(peer.listener_addr(), peer_addr, Some(cr.address));
141 peer.upgrade_to_connected(
142 peer_addr,
143 cr.listener_port,
144 cr.address,
145 cr.node_type,
146 cr.version,
147 ConnectionMode::Router,
148 );
149 }
150 #[cfg(feature = "metrics")]
151 self.update_metrics();
152 debug!("Completed the handshake with '{peer_addr}'");
153 }
154 Ok(None) => {
155 return Err(error(format!("Duplicate handshake attempt with '{addr}'")));
156 }
157 Err(_) => {
158 if let Some(peer) = self.peer_pool.write().get_mut(&addr) {
159 if peer.is_connecting() {
161 peer.downgrade_to_candidate(addr);
162 }
163 }
164 }
165 }
166 }
167
168 handshake_result
169 }
170
171 async fn handshake_inner_initiator<'a>(
173 &'a self,
174 peer_addr: SocketAddr,
175 stream: &'a mut TcpStream,
176 genesis_header: Header<N>,
177 restrictions_id: Field<N>,
178 ) -> io::Result<Option<ChallengeRequest<N>>> {
179 if !self.add_connecting_peer(peer_addr) {
181 return Ok(None);
183 }
184
185 let mut framed = Framed::new(stream, MessageCodec::<N>::handshake());
187
188 let rng = &mut OsRng;
190
191 let current_block_height = self.ledger.latest_block_height();
193 let consensus_version = N::CONSENSUS_VERSION(current_block_height).unwrap();
194 let snarkos_sha = (consensus_version >= ConsensusVersion::V12)
195 .then(|| built_info::GIT_COMMIT_HASH.unwrap_or_default().into());
196
197 let our_nonce = rng.r#gen();
201 let our_request =
203 ChallengeRequest::new(self.local_ip().port(), self.node_type, self.address(), our_nonce, snarkos_sha);
204 send(&mut framed, peer_addr, Message::ChallengeRequest(our_request)).await?;
205
206 let peer_response = expect_message!(Message::ChallengeResponse, framed, peer_addr);
210 let peer_request = expect_message!(Message::ChallengeRequest, framed, peer_addr);
212
213 if let Some(reason) = self
215 .verify_challenge_response(
216 peer_addr,
217 peer_request.address,
218 peer_request.node_type,
219 peer_response,
220 genesis_header,
221 restrictions_id,
222 our_nonce,
223 )
224 .await
225 {
226 send(&mut framed, peer_addr, reason.into()).await?;
227 return Err(error(format!("Dropped '{peer_addr}' for reason: {reason:?}")));
228 }
229 if let Some(reason) = self.verify_challenge_request(peer_addr, &peer_request) {
231 send(&mut framed, peer_addr, reason.into()).await?;
232 return Err(error(format!("Dropped '{peer_addr}' for reason: {reason:?}")));
233 }
234
235 let response_nonce: u64 = rng.r#gen();
238 let data = [peer_request.nonce.to_le_bytes(), response_nonce.to_le_bytes()].concat();
239 let Ok(our_signature) = self.account.sign_bytes(&data, rng) else {
241 return Err(error(format!("Failed to sign the challenge request nonce from '{peer_addr}'")));
242 };
243 let our_response = ChallengeResponse {
245 genesis_header,
246 restrictions_id,
247 signature: Data::Object(our_signature),
248 nonce: response_nonce,
249 };
250 send(&mut framed, peer_addr, Message::ChallengeResponse(our_response)).await?;
251
252 Ok(Some(peer_request))
253 }
254
255 async fn handshake_inner_responder<'a>(
257 &'a self,
258 peer_addr: SocketAddr,
259 listener_addr: &mut Option<SocketAddr>,
260 stream: &'a mut TcpStream,
261 genesis_header: Header<N>,
262 restrictions_id: Field<N>,
263 ) -> io::Result<Option<ChallengeRequest<N>>> {
264 let mut framed = Framed::new(stream, MessageCodec::<N>::handshake());
266
267 let peer_request = expect_message!(Message::ChallengeRequest, framed, peer_addr);
271
272 let current_block_height = self.ledger.latest_block_height();
274 let consensus_version = N::CONSENSUS_VERSION(current_block_height).unwrap();
275 let snarkos_sha = (consensus_version >= ConsensusVersion::V12)
276 .then(|| built_info::GIT_COMMIT_HASH.unwrap_or_default().into());
277
278 *listener_addr = Some(SocketAddr::new(peer_addr.ip(), peer_request.listener_port));
280 let listener_addr = listener_addr.unwrap();
281
282 if let Err(forbidden_message) = self.ensure_peer_is_allowed(listener_addr) {
284 return Err(error(format!("{forbidden_message}")));
285 }
286
287 if !self.add_connecting_peer(listener_addr) {
289 return Ok(None);
291 }
292
293 if let Some(reason) = self.verify_challenge_request(peer_addr, &peer_request) {
295 send(&mut framed, peer_addr, reason.into()).await?;
296 return Err(error(format!("Dropped '{peer_addr}' for reason: {reason:?}")));
297 }
298
299 let rng = &mut OsRng;
303
304 let response_nonce: u64 = rng.r#gen();
306 let data = [peer_request.nonce.to_le_bytes(), response_nonce.to_le_bytes()].concat();
307 let Ok(our_signature) = self.account.sign_bytes(&data, rng) else {
308 return Err(error(format!("Failed to sign the challenge request nonce from '{peer_addr}'")));
309 };
310 let our_response = ChallengeResponse {
312 genesis_header,
313 restrictions_id,
314 signature: Data::Object(our_signature),
315 nonce: response_nonce,
316 };
317 send(&mut framed, peer_addr, Message::ChallengeResponse(our_response)).await?;
318
319 let our_nonce = rng.r#gen();
321 let our_request =
323 ChallengeRequest::new(self.local_ip().port(), self.node_type, self.address(), our_nonce, snarkos_sha);
324 send(&mut framed, peer_addr, Message::ChallengeRequest(our_request)).await?;
325
326 let peer_response = expect_message!(Message::ChallengeResponse, framed, peer_addr);
330 if let Some(reason) = self
332 .verify_challenge_response(
333 peer_addr,
334 peer_request.address,
335 peer_request.node_type,
336 peer_response,
337 genesis_header,
338 restrictions_id,
339 our_nonce,
340 )
341 .await
342 {
343 send(&mut framed, peer_addr, reason.into()).await?;
344 return Err(error(format!("Dropped '{peer_addr}' for reason: {reason:?}")));
345 }
346
347 Ok(Some(peer_request))
348 }
349
350 fn ensure_peer_is_allowed(&self, listener_addr: SocketAddr) -> Result<()> {
352 if self.is_local_ip(listener_addr) {
354 bail!("Dropping connection request from '{listener_addr}' (attempted to self-connect)");
355 }
356 if self.trusted_peers_only() && !self.is_trusted(listener_addr) {
358 bail!("Dropping connection request from '{listener_addr}' (untrusted)");
359 }
360 Ok(())
361 }
362
363 fn verify_challenge_request(
365 &self,
366 peer_addr: SocketAddr,
367 message: &ChallengeRequest<N>,
368 ) -> Option<DisconnectReason> {
369 let &ChallengeRequest { version, listener_port: _, node_type, address, nonce: _, ref snarkos_sha } = message;
371 log_repo_sha_comparison(peer_addr, snarkos_sha.as_ref(), Self::OWNER);
372
373 if !self.is_valid_message_version(version) {
375 warn!("Dropping '{peer_addr}' on version {version} (outdated)");
376 return Some(DisconnectReason::OutdatedClientVersion);
377 }
378
379 if self.node_type() == NodeType::Validator
381 && node_type == NodeType::Validator
382 && self.is_connected_address(address)
383 {
384 warn!("Dropping '{peer_addr}' for being already connected ({address})");
385 return Some(DisconnectReason::NoReasonGiven);
386 }
387
388 None
389 }
390
391 #[allow(clippy::too_many_arguments)]
393 async fn verify_challenge_response(
394 &self,
395 peer_addr: SocketAddr,
396 peer_address: Address<N>,
397 peer_node_type: NodeType,
398 response: ChallengeResponse<N>,
399 expected_genesis_header: Header<N>,
400 expected_restrictions_id: Field<N>,
401 expected_nonce: u64,
402 ) -> Option<DisconnectReason> {
403 let ChallengeResponse { genesis_header, restrictions_id, signature, nonce } = response;
405
406 if genesis_header != expected_genesis_header {
408 warn!("Handshake with '{peer_addr}' failed (incorrect block header)");
409 return Some(DisconnectReason::InvalidChallengeResponse);
410 }
411 if !peer_node_type.is_prover() && !self.node_type.is_prover() && restrictions_id != expected_restrictions_id {
413 warn!("Handshake with '{peer_addr}' failed (incorrect restrictions ID)");
414 return Some(DisconnectReason::InvalidChallengeResponse);
415 }
416 let Ok(signature) = signature.deserialize().await else {
418 warn!("Handshake with '{peer_addr}' failed (cannot deserialize the signature)");
419 return Some(DisconnectReason::InvalidChallengeResponse);
420 };
421 if !signature.verify_bytes(&peer_address, &[expected_nonce.to_le_bytes(), nonce.to_le_bytes()].concat()) {
423 warn!("Handshake with '{peer_addr}' failed (invalid signature)");
424 return Some(DisconnectReason::InvalidChallengeResponse);
425 }
426 None
427 }
428}