1use crate::{
17 ConnectionMode,
18 NodeType,
19 PeerPoolHandling,
20 Router,
21 messages::{ChallengeRequest, ChallengeResponse, DisconnectReason, Message, MessageCodec, MessageTrait},
22};
23use snarkos_node_network::{get_repo_commit_hash, log_repo_sha_comparison};
24use snarkos_node_tcp::{ConnectError, ConnectionSide, P2P, Tcp};
25use snarkvm::{
26 ledger::narwhal::Data,
27 prelude::{Address, ConsensusVersion, Field, Network, block::Header},
28};
29
30use anyhow::{Result, anyhow};
31use futures::SinkExt;
32
33use std::{io, net::SocketAddr};
34use tokio::net::TcpStream;
35use tokio_stream::StreamExt;
36use tokio_util::codec::Framed;
37
38impl<N: Network> P2P for Router<N> {
39 fn tcp(&self) -> &Tcp {
41 &self.tcp
42 }
43}
44
45#[macro_export]
47macro_rules! expect_message {
48 ($msg_ty:path, $framed:expr, $peer_addr:expr) => {{
49 match $framed.try_next().await? {
50 Some($msg_ty(data)) => {
52 trace!("Received '{}' from '{}'", data.name(), $peer_addr);
53 data
54 }
55 Some(Message::Disconnect($crate::messages::Disconnect { reason })) => {
57 return Err(ConnectError::other(format!("'{}' disconnected with reason \"{reason}\"", $peer_addr)));
58 }
59 Some(ty) => {
61 return Err(ConnectError::other(format!(
62 "'{}' did not follow the handshake protocol: received {:?} instead of {}",
63 $peer_addr,
64 ty.name(),
65 stringify!($msg_ty),
66 )));
67 }
68 None => return Err(ConnectError::IoError(io::ErrorKind::BrokenPipe.into())),
70 }
71 }};
72}
73
74async fn send<N: Network>(
76 framed: &mut Framed<&mut TcpStream, MessageCodec<N>>,
77 peer_addr: SocketAddr,
78 message: Message<N>,
79) -> io::Result<()> {
80 trace!("Sending '{}' to '{peer_addr}'", message.name());
81 framed.send(message).await
82}
83
84impl<N: Network> Router<N> {
85 pub async fn handshake<'a>(
87 &'a self,
88 peer_addr: SocketAddr,
89 stream: &'a mut TcpStream,
90 peer_side: ConnectionSide,
91 genesis_header: Header<N>,
92 restrictions_id: Field<N>,
93 ) -> Result<ChallengeRequest<N>, ConnectError> {
94 let mut listener_addr = if peer_side == ConnectionSide::Initiator {
97 debug!("Received a connection request from '{peer_addr}'");
98 None
99 } else {
100 debug!("Shaking hands with '{peer_addr}'...");
101 Some(peer_addr)
102 };
103
104 #[cfg(not(feature = "test"))]
106 if !self.is_dev() && peer_side == ConnectionSide::Initiator {
107 if self.is_ip_banned(peer_addr.ip()) {
109 trace!("Rejected a connection request from banned IP '{}'", peer_addr.ip());
110 return Err(ConnectError::other(anyhow!("'{}' is a banned IP address", peer_addr.ip())));
111 }
112
113 let num_attempts =
114 self.cache.insert_inbound_connection(peer_addr.ip(), Router::<N>::CONNECTION_ATTEMPTS_SINCE_SECS);
115
116 debug!("Number of connection attempts from '{}': {}", peer_addr.ip(), num_attempts);
117 if num_attempts > Router::<N>::MAX_CONNECTION_ATTEMPTS {
118 self.update_ip_ban(peer_addr.ip());
119 trace!("Rejected a consecutive connection request from IP '{}'", peer_addr.ip());
120 return Err(ConnectError::other(anyhow!("'{}' appears to be spamming connections", peer_addr.ip())));
121 }
122 }
123
124 let handshake_result = match peer_side {
126 ConnectionSide::Responder => {
127 self.handshake_inner_initiator(peer_addr, stream, genesis_header, restrictions_id).await
128 }
129 ConnectionSide::Initiator => {
130 self.handshake_inner_responder(peer_addr, &mut listener_addr, stream, genesis_header, restrictions_id)
131 .await
132 }
133 };
134
135 if let Some(addr) = listener_addr {
136 match handshake_result {
137 Ok(ref cr) => {
138 if let Some(peer) = self.peer_pool.write().get_mut(&addr) {
139 self.resolver.write().insert_peer(peer.listener_addr(), peer_addr, Some(cr.address));
140 peer.upgrade_to_connected(
141 peer_addr,
142 cr.listener_port,
143 cr.address,
144 cr.node_type,
145 cr.version,
146 cr.snarkos_sha,
147 ConnectionMode::Router,
148 );
149 }
150
151 #[cfg(feature = "metrics")]
152 self.update_metrics();
153 }
154 Err(_) => {
155 if let Some(peer) = self.peer_pool.write().get_mut(&addr) {
156 if peer.is_connecting() {
158 peer.downgrade_to_candidate(addr);
159 }
160 }
161 }
162 }
163 }
164
165 handshake_result
166 }
167
168 async fn handshake_inner_initiator<'a>(
170 &'a self,
171 peer_addr: SocketAddr,
172 stream: &'a mut TcpStream,
173 genesis_header: Header<N>,
174 restrictions_id: Field<N>,
175 ) -> Result<ChallengeRequest<N>, ConnectError> {
176 self.add_connecting_peer(peer_addr)?;
179
180 let mut framed = Framed::new(stream, MessageCodec::<N>::handshake());
182
183 let current_block_height = self.ledger.latest_block_height();
185 let consensus_version = N::CONSENSUS_VERSION(current_block_height).unwrap();
186 let snarkos_sha = match (consensus_version >= ConsensusVersion::V12, get_repo_commit_hash()) {
187 (true, Some(sha)) => Some(sha),
188 _ => None,
189 };
190
191 let our_nonce: u64 = rand::random();
195 let our_request =
197 ChallengeRequest::new(self.local_ip().port(), self.node_type, self.address(), our_nonce, snarkos_sha);
198 send(&mut framed, peer_addr, Message::ChallengeRequest(our_request)).await?;
199
200 let peer_response = expect_message!(Message::ChallengeResponse, framed, peer_addr);
204 let peer_request = expect_message!(Message::ChallengeRequest, framed, peer_addr);
206
207 if let Some(reason) = self
209 .verify_challenge_response(
210 peer_addr,
211 peer_request.address,
212 peer_request.node_type,
213 peer_response,
214 genesis_header,
215 restrictions_id,
216 our_nonce,
217 )
218 .await
219 {
220 send(&mut framed, peer_addr, reason.into()).await?;
221 return Err(reason.into_connect_error(peer_addr));
222 }
223
224 if let Some(reason) = self.verify_challenge_request(peer_addr, &peer_request) {
226 send(&mut framed, peer_addr, reason.into()).await?;
227 return Err(reason.into_connect_error(peer_addr));
228 }
229
230 let response_nonce: u64 = rand::random();
233 let data = [peer_request.nonce.to_le_bytes(), response_nonce.to_le_bytes()].concat();
234 let Ok(our_signature) = self.account.sign_bytes(&data, &mut rand::rng()) else {
236 return Err(ConnectError::other(anyhow!("Failed to sign the challenge request nonce")));
237 };
238 let our_response = ChallengeResponse {
240 genesis_header,
241 restrictions_id,
242 signature: Data::Object(our_signature),
243 nonce: response_nonce,
244 };
245 send(&mut framed, peer_addr, Message::ChallengeResponse(our_response)).await?;
246
247 Ok(peer_request)
248 }
249
250 async fn handshake_inner_responder<'a>(
252 &'a self,
253 peer_addr: SocketAddr,
254 listener_addr: &mut Option<SocketAddr>,
255 stream: &'a mut TcpStream,
256 genesis_header: Header<N>,
257 restrictions_id: Field<N>,
258 ) -> Result<ChallengeRequest<N>, ConnectError> {
259 let mut framed = Framed::new(stream, MessageCodec::<N>::handshake());
261
262 let peer_request = expect_message!(Message::ChallengeRequest, framed, peer_addr);
266
267 let current_block_height = self.ledger.latest_block_height();
269 let consensus_version = N::CONSENSUS_VERSION(current_block_height).unwrap();
270 let snarkos_sha = match (consensus_version >= ConsensusVersion::V12, get_repo_commit_hash()) {
271 (true, Some(sha)) => Some(sha),
272 _ => None,
273 };
274
275 *listener_addr = Some(SocketAddr::new(peer_addr.ip(), peer_request.listener_port));
277 let listener_addr = listener_addr.unwrap();
278
279 if let Err(reason) = self.ensure_peer_is_allowed(listener_addr) {
281 send(&mut framed, peer_addr, reason.into()).await?;
282 return Err(reason.into_connect_error(listener_addr));
283 }
284
285 self.add_connecting_peer(listener_addr)?;
287
288 if let Some(reason) = self.verify_challenge_request(peer_addr, &peer_request) {
290 send(&mut framed, peer_addr, reason.into()).await?;
291 return Err(reason.into_connect_error(peer_addr));
292 }
293
294 let response_nonce: u64 = rand::random();
298 let data = [peer_request.nonce.to_le_bytes(), response_nonce.to_le_bytes()].concat();
299 let Ok(our_signature) = self.account.sign_bytes(&data, &mut rand::rng()) else {
300 return Err(ConnectError::Other(
301 anyhow!("Failed to sign the challenge request nonce from '{peer_addr}'").into(),
302 ));
303 };
304 let our_response = ChallengeResponse {
306 genesis_header,
307 restrictions_id,
308 signature: Data::Object(our_signature),
309 nonce: response_nonce,
310 };
311 send(&mut framed, peer_addr, Message::ChallengeResponse(our_response)).await?;
312
313 let our_nonce: u64 = rand::random();
315 let our_request =
317 ChallengeRequest::new(self.local_ip().port(), self.node_type, self.address(), our_nonce, snarkos_sha);
318 send(&mut framed, peer_addr, Message::ChallengeRequest(our_request)).await?;
319
320 let peer_response = expect_message!(Message::ChallengeResponse, framed, peer_addr);
324
325 if let Some(reason) = self
327 .verify_challenge_response(
328 peer_addr,
329 peer_request.address,
330 peer_request.node_type,
331 peer_response,
332 genesis_header,
333 restrictions_id,
334 our_nonce,
335 )
336 .await
337 {
338 send(&mut framed, peer_addr, reason.into()).await?;
339 Err(reason.into_connect_error(peer_addr))
340 } else {
341 Ok(peer_request)
342 }
343 }
344
345 fn ensure_peer_is_allowed(&self, listener_addr: SocketAddr) -> Result<(), DisconnectReason> {
347 if self.is_local_ip(listener_addr) {
349 return Err(DisconnectReason::SelfConnect);
350 }
351 if self.node_type() == NodeType::Validator
353 && !self.is_trusted(listener_addr)
354 && !crate::bootstrap_peers::<N>(self.is_dev()).contains(&listener_addr)
355 {
356 return Err(DisconnectReason::NoExternalPeersAllowed);
357 }
358 if self.trusted_peers_only() && !self.is_trusted(listener_addr) {
360 return Err(DisconnectReason::NoExternalPeersAllowed);
361 }
362
363 Ok(())
364 }
365
366 fn verify_challenge_request(
368 &self,
369 peer_addr: SocketAddr,
370 message: &ChallengeRequest<N>,
371 ) -> Option<DisconnectReason> {
372 let &ChallengeRequest { version, listener_port: _, node_type, address, nonce: _, ref snarkos_sha } = message;
374 log_repo_sha_comparison(peer_addr, snarkos_sha, Self::OWNER);
375
376 if !self.is_valid_message_version(version) {
378 warn!("Dropping '{peer_addr}' on version {version} (outdated)");
379 return Some(DisconnectReason::OutdatedClientVersion);
380 }
381
382 if self.node_type() == NodeType::Validator
384 && node_type == NodeType::Validator
385 && self.is_connected_address(address)
386 {
387 warn!("Dropping '{peer_addr}' for being already connected ({address})");
388 return Some(DisconnectReason::NoReasonGiven);
389 }
390
391 None
392 }
393
394 #[allow(clippy::too_many_arguments)]
396 async fn verify_challenge_response(
397 &self,
398 peer_addr: SocketAddr,
399 peer_address: Address<N>,
400 peer_node_type: NodeType,
401 response: ChallengeResponse<N>,
402 expected_genesis_header: Header<N>,
403 expected_restrictions_id: Field<N>,
404 expected_nonce: u64,
405 ) -> Option<DisconnectReason> {
406 let ChallengeResponse { genesis_header, restrictions_id, signature, nonce } = response;
408
409 if genesis_header != expected_genesis_header {
411 warn!("Handshake with '{peer_addr}' failed (incorrect block header)");
412 return Some(DisconnectReason::InvalidChallengeResponse);
413 }
414 if !peer_node_type.is_prover() && !self.node_type.is_prover() && restrictions_id != expected_restrictions_id {
416 warn!("Handshake with '{peer_addr}' failed (incorrect restrictions ID)");
417 return Some(DisconnectReason::InvalidChallengeResponse);
418 }
419 let Ok(signature) = signature.deserialize().await else {
421 warn!("Handshake with '{peer_addr}' failed (cannot deserialize the signature)");
422 return Some(DisconnectReason::InvalidChallengeResponse);
423 };
424 if !signature.verify_bytes(&peer_address, &[expected_nonce.to_le_bytes(), nonce.to_le_bytes()].concat()) {
426 warn!("Handshake with '{peer_addr}' failed (invalid signature)");
427 return Some(DisconnectReason::InvalidChallengeResponse);
428 }
429 None
430 }
431}