1use crate::{
17 ConnectionMode,
18 NodeType,
19 PeerPoolHandling,
20 Router,
21 messages::{ChallengeRequest, ChallengeResponse, DisconnectReason, Message, MessageCodec, MessageTrait},
22};
23use snarkos_node_network::{get_repo_commit_hash, log_repo_sha_comparison};
24use snarkos_node_tcp::{ConnectError, ConnectionSide, P2P, Tcp};
25use snarkvm::{
26 ledger::narwhal::Data,
27 prelude::{Address, ConsensusVersion, Field, Network, block::Header},
28};
29
30use anyhow::{Result, anyhow};
31use futures::SinkExt;
32use rand::{Rng, rngs::OsRng};
33use std::{io, net::SocketAddr};
34use tokio::net::TcpStream;
35use tokio_stream::StreamExt;
36use tokio_util::codec::Framed;
37
38impl<N: Network> P2P for Router<N> {
39 fn tcp(&self) -> &Tcp {
41 &self.tcp
42 }
43}
44
45#[macro_export]
47macro_rules! expect_message {
48 ($msg_ty:path, $framed:expr, $peer_addr:expr) => {{
49 match $framed.try_next().await? {
50 Some($msg_ty(data)) => {
52 trace!("Received '{}' from '{}'", data.name(), $peer_addr);
53 data
54 }
55 Some(Message::Disconnect($crate::messages::Disconnect { reason })) => {
57 return Err(ConnectError::other(format!("'{}' disconnected with reason \"{reason}\"", $peer_addr)));
58 }
59 Some(ty) => {
61 return Err(ConnectError::other(format!(
62 "'{}' did not follow the handshake protocol: received {:?} instead of {}",
63 $peer_addr,
64 ty.name(),
65 stringify!($msg_ty),
66 )));
67 }
68 None => return Err(ConnectError::IoError(io::ErrorKind::BrokenPipe.into())),
70 }
71 }};
72}
73
74async fn send<N: Network>(
76 framed: &mut Framed<&mut TcpStream, MessageCodec<N>>,
77 peer_addr: SocketAddr,
78 message: Message<N>,
79) -> io::Result<()> {
80 trace!("Sending '{}' to '{peer_addr}'", message.name());
81 framed.send(message).await
82}
83
84impl<N: Network> Router<N> {
85 pub async fn handshake<'a>(
87 &'a self,
88 peer_addr: SocketAddr,
89 stream: &'a mut TcpStream,
90 peer_side: ConnectionSide,
91 genesis_header: Header<N>,
92 restrictions_id: Field<N>,
93 ) -> Result<ChallengeRequest<N>, ConnectError> {
94 let mut listener_addr = if peer_side == ConnectionSide::Initiator {
97 debug!("Received a connection request from '{peer_addr}'");
98 None
99 } else {
100 debug!("Shaking hands with '{peer_addr}'...");
101 Some(peer_addr)
102 };
103
104 #[cfg(not(feature = "test"))]
106 if !self.is_dev() && peer_side == ConnectionSide::Initiator {
107 if self.is_ip_banned(peer_addr.ip()) {
109 trace!("Rejected a connection request from banned IP '{}'", peer_addr.ip());
110 return Err(ConnectError::other(anyhow!("'{}' is a banned IP address", peer_addr.ip())));
111 }
112
113 let num_attempts =
114 self.cache.insert_inbound_connection(peer_addr.ip(), Router::<N>::CONNECTION_ATTEMPTS_SINCE_SECS);
115
116 debug!("Number of connection attempts from '{}': {}", peer_addr.ip(), num_attempts);
117 if num_attempts > Router::<N>::MAX_CONNECTION_ATTEMPTS {
118 self.update_ip_ban(peer_addr.ip());
119 trace!("Rejected a consecutive connection request from IP '{}'", peer_addr.ip());
120 return Err(ConnectError::other(anyhow!("'{}' appears to be spamming connections", peer_addr.ip())));
121 }
122 }
123
124 let handshake_result = match peer_side {
126 ConnectionSide::Responder => {
127 self.handshake_inner_initiator(peer_addr, stream, genesis_header, restrictions_id).await
128 }
129 ConnectionSide::Initiator => {
130 self.handshake_inner_responder(peer_addr, &mut listener_addr, stream, genesis_header, restrictions_id)
131 .await
132 }
133 };
134
135 if let Some(addr) = listener_addr {
136 match handshake_result {
137 Ok(ref cr) => {
138 if let Some(peer) = self.peer_pool.write().get_mut(&addr) {
139 self.resolver.write().insert_peer(peer.listener_addr(), peer_addr, Some(cr.address));
140 peer.upgrade_to_connected(
141 peer_addr,
142 cr.listener_port,
143 cr.address,
144 cr.node_type,
145 cr.version,
146 cr.snarkos_sha,
147 ConnectionMode::Router,
148 );
149 }
150
151 #[cfg(feature = "metrics")]
152 self.update_metrics();
153 }
154 Err(_) => {
155 if let Some(peer) = self.peer_pool.write().get_mut(&addr) {
156 if peer.is_connecting() {
158 peer.downgrade_to_candidate(addr);
159 }
160 }
161 }
162 }
163 }
164
165 handshake_result
166 }
167
168 async fn handshake_inner_initiator<'a>(
170 &'a self,
171 peer_addr: SocketAddr,
172 stream: &'a mut TcpStream,
173 genesis_header: Header<N>,
174 restrictions_id: Field<N>,
175 ) -> Result<ChallengeRequest<N>, ConnectError> {
176 self.add_connecting_peer(peer_addr)?;
179
180 let mut framed = Framed::new(stream, MessageCodec::<N>::handshake());
182
183 let rng = &mut OsRng;
185
186 let current_block_height = self.ledger.latest_block_height();
188 let consensus_version = N::CONSENSUS_VERSION(current_block_height).unwrap();
189 let snarkos_sha = match (consensus_version >= ConsensusVersion::V12, get_repo_commit_hash()) {
190 (true, Some(sha)) => Some(sha),
191 _ => None,
192 };
193
194 let our_nonce = rng.r#gen();
198 let our_request =
200 ChallengeRequest::new(self.local_ip().port(), self.node_type, self.address(), our_nonce, snarkos_sha);
201 send(&mut framed, peer_addr, Message::ChallengeRequest(our_request)).await?;
202
203 let peer_response = expect_message!(Message::ChallengeResponse, framed, peer_addr);
207 let peer_request = expect_message!(Message::ChallengeRequest, framed, peer_addr);
209
210 if let Some(reason) = self
212 .verify_challenge_response(
213 peer_addr,
214 peer_request.address,
215 peer_request.node_type,
216 peer_response,
217 genesis_header,
218 restrictions_id,
219 our_nonce,
220 )
221 .await
222 {
223 send(&mut framed, peer_addr, reason.into()).await?;
224 return Err(reason.into_connect_error(peer_addr));
225 }
226
227 if let Some(reason) = self.verify_challenge_request(peer_addr, &peer_request) {
229 send(&mut framed, peer_addr, reason.into()).await?;
230 return Err(reason.into_connect_error(peer_addr));
231 }
232
233 let response_nonce: u64 = rng.r#gen();
236 let data = [peer_request.nonce.to_le_bytes(), response_nonce.to_le_bytes()].concat();
237 let Ok(our_signature) = self.account.sign_bytes(&data, rng) else {
239 return Err(ConnectError::other(anyhow!("Failed to sign the challenge request nonce")));
240 };
241 let our_response = ChallengeResponse {
243 genesis_header,
244 restrictions_id,
245 signature: Data::Object(our_signature),
246 nonce: response_nonce,
247 };
248 send(&mut framed, peer_addr, Message::ChallengeResponse(our_response)).await?;
249
250 Ok(peer_request)
251 }
252
253 async fn handshake_inner_responder<'a>(
255 &'a self,
256 peer_addr: SocketAddr,
257 listener_addr: &mut Option<SocketAddr>,
258 stream: &'a mut TcpStream,
259 genesis_header: Header<N>,
260 restrictions_id: Field<N>,
261 ) -> Result<ChallengeRequest<N>, ConnectError> {
262 let mut framed = Framed::new(stream, MessageCodec::<N>::handshake());
264
265 let peer_request = expect_message!(Message::ChallengeRequest, framed, peer_addr);
269
270 let current_block_height = self.ledger.latest_block_height();
272 let consensus_version = N::CONSENSUS_VERSION(current_block_height).unwrap();
273 let snarkos_sha = match (consensus_version >= ConsensusVersion::V12, get_repo_commit_hash()) {
274 (true, Some(sha)) => Some(sha),
275 _ => None,
276 };
277
278 *listener_addr = Some(SocketAddr::new(peer_addr.ip(), peer_request.listener_port));
280 let listener_addr = listener_addr.unwrap();
281
282 if let Err(reason) = self.ensure_peer_is_allowed(listener_addr) {
284 send(&mut framed, peer_addr, reason.into()).await?;
285 return Err(reason.into_connect_error(listener_addr));
286 }
287
288 self.add_connecting_peer(listener_addr)?;
290
291 if let Some(reason) = self.verify_challenge_request(peer_addr, &peer_request) {
293 send(&mut framed, peer_addr, reason.into()).await?;
294 return Err(reason.into_connect_error(peer_addr));
295 }
296
297 let rng = &mut OsRng;
301
302 let response_nonce: u64 = rng.r#gen();
304 let data = [peer_request.nonce.to_le_bytes(), response_nonce.to_le_bytes()].concat();
305 let Ok(our_signature) = self.account.sign_bytes(&data, rng) else {
306 return Err(ConnectError::Other(
307 anyhow!("Failed to sign the challenge request nonce from '{peer_addr}'").into(),
308 ));
309 };
310 let our_response = ChallengeResponse {
312 genesis_header,
313 restrictions_id,
314 signature: Data::Object(our_signature),
315 nonce: response_nonce,
316 };
317 send(&mut framed, peer_addr, Message::ChallengeResponse(our_response)).await?;
318
319 let our_nonce = rng.r#gen();
321 let our_request =
323 ChallengeRequest::new(self.local_ip().port(), self.node_type, self.address(), our_nonce, snarkos_sha);
324 send(&mut framed, peer_addr, Message::ChallengeRequest(our_request)).await?;
325
326 let peer_response = expect_message!(Message::ChallengeResponse, framed, peer_addr);
330
331 if let Some(reason) = self
333 .verify_challenge_response(
334 peer_addr,
335 peer_request.address,
336 peer_request.node_type,
337 peer_response,
338 genesis_header,
339 restrictions_id,
340 our_nonce,
341 )
342 .await
343 {
344 send(&mut framed, peer_addr, reason.into()).await?;
345 Err(reason.into_connect_error(peer_addr))
346 } else {
347 Ok(peer_request)
348 }
349 }
350
351 fn ensure_peer_is_allowed(&self, listener_addr: SocketAddr) -> Result<(), DisconnectReason> {
353 if self.is_local_ip(listener_addr) {
355 return Err(DisconnectReason::SelfConnect);
356 }
357 if self.node_type() == NodeType::Validator
359 && !self.is_trusted(listener_addr)
360 && !crate::bootstrap_peers::<N>(self.is_dev()).contains(&listener_addr)
361 {
362 return Err(DisconnectReason::NoExternalPeersAllowed);
363 }
364 if self.trusted_peers_only() && !self.is_trusted(listener_addr) {
366 return Err(DisconnectReason::NoExternalPeersAllowed);
367 }
368
369 Ok(())
370 }
371
372 fn verify_challenge_request(
374 &self,
375 peer_addr: SocketAddr,
376 message: &ChallengeRequest<N>,
377 ) -> Option<DisconnectReason> {
378 let &ChallengeRequest { version, listener_port: _, node_type, address, nonce: _, ref snarkos_sha } = message;
380 log_repo_sha_comparison(peer_addr, snarkos_sha, Self::OWNER);
381
382 if !self.is_valid_message_version(version) {
384 warn!("Dropping '{peer_addr}' on version {version} (outdated)");
385 return Some(DisconnectReason::OutdatedClientVersion);
386 }
387
388 if self.node_type() == NodeType::Validator
390 && node_type == NodeType::Validator
391 && self.is_connected_address(address)
392 {
393 warn!("Dropping '{peer_addr}' for being already connected ({address})");
394 return Some(DisconnectReason::NoReasonGiven);
395 }
396
397 None
398 }
399
400 #[allow(clippy::too_many_arguments)]
402 async fn verify_challenge_response(
403 &self,
404 peer_addr: SocketAddr,
405 peer_address: Address<N>,
406 peer_node_type: NodeType,
407 response: ChallengeResponse<N>,
408 expected_genesis_header: Header<N>,
409 expected_restrictions_id: Field<N>,
410 expected_nonce: u64,
411 ) -> Option<DisconnectReason> {
412 let ChallengeResponse { genesis_header, restrictions_id, signature, nonce } = response;
414
415 if genesis_header != expected_genesis_header {
417 warn!("Handshake with '{peer_addr}' failed (incorrect block header)");
418 return Some(DisconnectReason::InvalidChallengeResponse);
419 }
420 if !peer_node_type.is_prover() && !self.node_type.is_prover() && restrictions_id != expected_restrictions_id {
422 warn!("Handshake with '{peer_addr}' failed (incorrect restrictions ID)");
423 return Some(DisconnectReason::InvalidChallengeResponse);
424 }
425 let Ok(signature) = signature.deserialize().await else {
427 warn!("Handshake with '{peer_addr}' failed (cannot deserialize the signature)");
428 return Some(DisconnectReason::InvalidChallengeResponse);
429 };
430 if !signature.verify_bytes(&peer_address, &[expected_nonce.to_le_bytes(), nonce.to_le_bytes()].concat()) {
432 warn!("Handshake with '{peer_addr}' failed (invalid signature)");
433 return Some(DisconnectReason::InvalidChallengeResponse);
434 }
435 None
436 }
437}