1use crate::{
17 NodeType,
18 PeerPoolHandling,
19 Router,
20 messages::{ChallengeRequest, ChallengeResponse, DisconnectReason, Message, MessageCodec, MessageTrait},
21};
22use snarkos_node_tcp::{ConnectionSide, P2P, Tcp};
23use snarkvm::{
24 ledger::narwhal::Data,
25 prelude::{Address, Field, Network, block::Header, error},
26};
27
28use anyhow::{Result, bail};
29use futures::SinkExt;
30use rand::{Rng, rngs::OsRng};
31use std::{io, net::SocketAddr};
32use tokio::net::TcpStream;
33use tokio_stream::StreamExt;
34use tokio_util::codec::Framed;
35
36impl<N: Network> P2P for Router<N> {
37 fn tcp(&self) -> &Tcp {
39 &self.tcp
40 }
41}
42
43#[macro_export]
45macro_rules! expect_message {
46 ($msg_ty:path, $framed:expr, $peer_addr:expr) => {
47 match $framed.try_next().await? {
48 Some($msg_ty(data)) => {
50 trace!("Received '{}' from '{}'", data.name(), $peer_addr);
51 data
52 }
53 Some(Message::Disconnect(reason)) => {
55 return Err(error(format!("'{}' disconnected: {reason:?}", $peer_addr)))
56 }
57 Some(ty) => {
59 return Err(error(format!(
60 "'{}' did not follow the handshake protocol: received {:?} instead of {}",
61 $peer_addr,
62 ty.name(),
63 stringify!($msg_ty),
64 )))
65 }
66 None => {
68 return Err(error(format!(
69 "the peer disconnected before sending {:?}, likely due to peer saturation or shutdown",
70 stringify!($msg_ty),
71 )))
72 }
73 }
74 };
75}
76
77async fn send<N: Network>(
79 framed: &mut Framed<&mut TcpStream, MessageCodec<N>>,
80 peer_addr: SocketAddr,
81 message: Message<N>,
82) -> io::Result<()> {
83 trace!("Sending '{}' to '{peer_addr}'", message.name());
84 framed.send(message).await
85}
86
87impl<N: Network> Router<N> {
88 pub async fn handshake<'a>(
90 &'a self,
91 peer_addr: SocketAddr,
92 stream: &'a mut TcpStream,
93 peer_side: ConnectionSide,
94 genesis_header: Header<N>,
95 restrictions_id: Field<N>,
96 ) -> io::Result<Option<ChallengeRequest<N>>> {
97 let mut listener_addr = if peer_side == ConnectionSide::Initiator {
100 debug!("Received a connection request from '{peer_addr}'");
101 None
102 } else {
103 debug!("Shaking hands with '{peer_addr}'...");
104 Some(peer_addr)
105 };
106
107 #[cfg(not(feature = "test"))]
109 if !self.is_dev() && peer_side == ConnectionSide::Initiator {
110 if self.is_ip_banned(peer_addr.ip()) {
112 trace!("Rejected a connection request from banned IP '{}'", peer_addr.ip());
113 return Err(error(format!("'{}' is a banned IP address", peer_addr.ip())));
114 }
115
116 let num_attempts =
117 self.cache.insert_inbound_connection(peer_addr.ip(), Router::<N>::CONNECTION_ATTEMPTS_SINCE_SECS);
118
119 debug!("Number of connection attempts from '{}': {}", peer_addr.ip(), num_attempts);
120 if num_attempts > Router::<N>::MAX_CONNECTION_ATTEMPTS {
121 self.update_ip_ban(peer_addr.ip());
122 trace!("Rejected a consecutive connection request from IP '{}'", peer_addr.ip());
123 return Err(error(format!("'{}' appears to be spamming connections", peer_addr.ip())));
124 }
125 }
126
127 let handshake_result = if peer_side == ConnectionSide::Responder {
129 self.handshake_inner_initiator(peer_addr, stream, genesis_header, restrictions_id).await
130 } else {
131 self.handshake_inner_responder(peer_addr, &mut listener_addr, stream, genesis_header, restrictions_id).await
132 };
133
134 if let Some(addr) = listener_addr {
135 match handshake_result {
136 Ok(Some(ref cr)) => {
137 if let Some(peer) = self.peer_pool.write().get_mut(&addr) {
138 self.resolver.write().insert_peer(peer.listener_addr(), peer_addr, None);
139 peer.upgrade_to_connected(peer_addr, cr.listener_port, cr.address, cr.node_type, cr.version);
140 }
141 #[cfg(feature = "metrics")]
142 self.update_metrics();
143 debug!("Completed the handshake with '{peer_addr}'");
144 }
145 Ok(None) => {
146 return Err(error("Duplicate handshake attempt with '{addr}'"));
147 }
148 Err(_) => {
149 if let Some(peer) = self.peer_pool.write().get_mut(&addr) {
150 peer.downgrade_to_candidate(addr);
151 }
152 }
153 }
154 }
155
156 handshake_result
157 }
158
159 async fn handshake_inner_initiator<'a>(
161 &'a self,
162 peer_addr: SocketAddr,
163 stream: &'a mut TcpStream,
164 genesis_header: Header<N>,
165 restrictions_id: Field<N>,
166 ) -> io::Result<Option<ChallengeRequest<N>>> {
167 if !self.add_connecting_peer(peer_addr) {
169 return Ok(None);
171 }
172
173 let mut framed = Framed::new(stream, MessageCodec::<N>::handshake());
175
176 let rng = &mut OsRng;
178
179 let our_nonce = rng.r#gen();
183 let our_request = ChallengeRequest::new(self.local_ip().port(), self.node_type, self.address(), our_nonce);
185 send(&mut framed, peer_addr, Message::ChallengeRequest(our_request)).await?;
186
187 let peer_response = expect_message!(Message::ChallengeResponse, framed, peer_addr);
191 let peer_request = expect_message!(Message::ChallengeRequest, framed, peer_addr);
193
194 if let Some(reason) = self
196 .verify_challenge_response(
197 peer_addr,
198 peer_request.address,
199 peer_request.node_type,
200 peer_response,
201 genesis_header,
202 restrictions_id,
203 our_nonce,
204 )
205 .await
206 {
207 send(&mut framed, peer_addr, reason.into()).await?;
208 return Err(error(format!("Dropped '{peer_addr}' for reason: {reason:?}")));
209 }
210 if let Some(reason) = self.verify_challenge_request(peer_addr, &peer_request) {
212 send(&mut framed, peer_addr, reason.into()).await?;
213 return Err(error(format!("Dropped '{peer_addr}' for reason: {reason:?}")));
214 }
215
216 let response_nonce: u64 = rng.r#gen();
219 let data = [peer_request.nonce.to_le_bytes(), response_nonce.to_le_bytes()].concat();
220 let Ok(our_signature) = self.account.sign_bytes(&data, rng) else {
222 return Err(error(format!("Failed to sign the challenge request nonce from '{peer_addr}'")));
223 };
224 let our_response = ChallengeResponse {
226 genesis_header,
227 restrictions_id,
228 signature: Data::Object(our_signature),
229 nonce: response_nonce,
230 };
231 send(&mut framed, peer_addr, Message::ChallengeResponse(our_response)).await?;
232
233 Ok(Some(peer_request))
234 }
235
236 async fn handshake_inner_responder<'a>(
238 &'a self,
239 peer_addr: SocketAddr,
240 listener_addr: &mut Option<SocketAddr>,
241 stream: &'a mut TcpStream,
242 genesis_header: Header<N>,
243 restrictions_id: Field<N>,
244 ) -> io::Result<Option<ChallengeRequest<N>>> {
245 let mut framed = Framed::new(stream, MessageCodec::<N>::handshake());
247
248 let peer_request = expect_message!(Message::ChallengeRequest, framed, peer_addr);
252
253 *listener_addr = Some(SocketAddr::new(peer_addr.ip(), peer_request.listener_port));
255 let listener_addr = listener_addr.unwrap();
256
257 if let Err(forbidden_message) = self.ensure_peer_is_allowed(listener_addr) {
259 return Err(error(format!("{forbidden_message}")));
260 }
261
262 if !self.add_connecting_peer(listener_addr) {
264 return Ok(None);
266 }
267
268 if let Some(reason) = self.verify_challenge_request(peer_addr, &peer_request) {
270 send(&mut framed, peer_addr, reason.into()).await?;
271 return Err(error(format!("Dropped '{peer_addr}' for reason: {reason:?}")));
272 }
273
274 let rng = &mut OsRng;
278
279 let response_nonce: u64 = rng.r#gen();
281 let data = [peer_request.nonce.to_le_bytes(), response_nonce.to_le_bytes()].concat();
282 let Ok(our_signature) = self.account.sign_bytes(&data, rng) else {
283 return Err(error(format!("Failed to sign the challenge request nonce from '{peer_addr}'")));
284 };
285 let our_response = ChallengeResponse {
287 genesis_header,
288 restrictions_id,
289 signature: Data::Object(our_signature),
290 nonce: response_nonce,
291 };
292 send(&mut framed, peer_addr, Message::ChallengeResponse(our_response)).await?;
293
294 let our_nonce = rng.r#gen();
296 let our_request = ChallengeRequest::new(self.local_ip().port(), self.node_type, self.address(), our_nonce);
298 send(&mut framed, peer_addr, Message::ChallengeRequest(our_request)).await?;
299
300 let peer_response = expect_message!(Message::ChallengeResponse, framed, peer_addr);
304 if let Some(reason) = self
306 .verify_challenge_response(
307 peer_addr,
308 peer_request.address,
309 peer_request.node_type,
310 peer_response,
311 genesis_header,
312 restrictions_id,
313 our_nonce,
314 )
315 .await
316 {
317 send(&mut framed, peer_addr, reason.into()).await?;
318 return Err(error(format!("Dropped '{peer_addr}' for reason: {reason:?}")));
319 }
320
321 Ok(Some(peer_request))
322 }
323
324 fn ensure_peer_is_allowed(&self, listener_addr: SocketAddr) -> Result<()> {
326 if self.is_local_ip(listener_addr) {
328 bail!("Dropping connection request from '{listener_addr}' (attempted to self-connect)");
329 }
330 if !self.allow_external_peers() && !self.is_trusted(listener_addr) {
332 bail!("Dropping connection request from '{listener_addr}' (untrusted)");
333 }
334 Ok(())
335 }
336
337 fn verify_challenge_request(
339 &self,
340 peer_addr: SocketAddr,
341 message: &ChallengeRequest<N>,
342 ) -> Option<DisconnectReason> {
343 let &ChallengeRequest { version, listener_port: _, node_type: _, address: _, nonce: _ } = message;
345
346 if !self.is_valid_message_version(version) {
348 warn!("Dropping '{peer_addr}' on version {version} (outdated)");
349 return Some(DisconnectReason::OutdatedClientVersion);
350 }
351 None
352 }
353
354 #[allow(clippy::too_many_arguments)]
356 async fn verify_challenge_response(
357 &self,
358 peer_addr: SocketAddr,
359 peer_address: Address<N>,
360 peer_node_type: NodeType,
361 response: ChallengeResponse<N>,
362 expected_genesis_header: Header<N>,
363 expected_restrictions_id: Field<N>,
364 expected_nonce: u64,
365 ) -> Option<DisconnectReason> {
366 let ChallengeResponse { genesis_header, restrictions_id, signature, nonce } = response;
368
369 if genesis_header != expected_genesis_header {
371 warn!("Handshake with '{peer_addr}' failed (incorrect block header)");
372 return Some(DisconnectReason::InvalidChallengeResponse);
373 }
374 if !peer_node_type.is_prover() && !self.node_type.is_prover() && restrictions_id != expected_restrictions_id {
376 warn!("Handshake with '{peer_addr}' failed (incorrect restrictions ID)");
377 return Some(DisconnectReason::InvalidChallengeResponse);
378 }
379 let Ok(signature) = signature.deserialize().await else {
381 warn!("Handshake with '{peer_addr}' failed (cannot deserialize the signature)");
382 return Some(DisconnectReason::InvalidChallengeResponse);
383 };
384 if !signature.verify_bytes(&peer_address, &[expected_nonce.to_le_bytes(), nonce.to_le_bytes()].concat()) {
386 warn!("Handshake with '{peer_addr}' failed (invalid signature)");
387 return Some(DisconnectReason::InvalidChallengeResponse);
388 }
389 None
390 }
391}