Skip to main content

snarkos_node_router/
handshake.rs

1// Copyright (c) 2019-2026 Provable Inc.
2// This file is part of the snarkOS library.
3
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at:
7
8// http://www.apache.org/licenses/LICENSE-2.0
9
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16use crate::{
17    ConnectionMode,
18    NodeType,
19    PeerPoolHandling,
20    Router,
21    messages::{ChallengeRequest, ChallengeResponse, DisconnectReason, Message, MessageCodec, MessageTrait},
22};
23use snarkos_node_network::{get_repo_commit_hash, log_repo_sha_comparison};
24use snarkos_node_tcp::{ConnectError, ConnectionSide, P2P, Tcp};
25use snarkvm::{
26    ledger::narwhal::Data,
27    prelude::{Address, ConsensusVersion, Field, Network, block::Header},
28};
29
30use anyhow::{Result, anyhow};
31use futures::SinkExt;
32use rand::{Rng, rngs::OsRng};
33use std::{io, net::SocketAddr};
34use tokio::net::TcpStream;
35use tokio_stream::StreamExt;
36use tokio_util::codec::Framed;
37
38impl<N: Network> P2P for Router<N> {
39    /// Returns a reference to the TCP instance.
40    fn tcp(&self) -> &Tcp {
41        &self.tcp
42    }
43}
44
45/// A macro unwrapping the expected handshake message or returning an error for unexpected messages.
46#[macro_export]
47macro_rules! expect_message {
48    ($msg_ty:path, $framed:expr, $peer_addr:expr) => {{
49        match $framed.try_next().await? {
50            // Received the expected message, proceed.
51            Some($msg_ty(data)) => {
52                trace!("Received '{}' from '{}'", data.name(), $peer_addr);
53                data
54            }
55            // Received a disconnect message, abort.
56            Some(Message::Disconnect($crate::messages::Disconnect { reason })) => {
57                return Err(ConnectError::other(format!("'{}' disconnected with reason \"{reason}\"", $peer_addr)));
58            }
59            // Received an unexpected message, abort.
60            Some(ty) => {
61                return Err(ConnectError::other(format!(
62                    "'{}' did not follow the handshake protocol: received {:?} instead of {}",
63                    $peer_addr,
64                    ty.name(),
65                    stringify!($msg_ty),
66                )));
67            }
68            // Received nothing.
69            None => return Err(ConnectError::IoError(io::ErrorKind::BrokenPipe.into())),
70        }
71    }};
72}
73
74/// Send the given message to the peer.
75async fn send<N: Network>(
76    framed: &mut Framed<&mut TcpStream, MessageCodec<N>>,
77    peer_addr: SocketAddr,
78    message: Message<N>,
79) -> io::Result<()> {
80    trace!("Sending '{}' to '{peer_addr}'", message.name());
81    framed.send(message).await
82}
83
84impl<N: Network> Router<N> {
85    /// Executes the handshake protocol.
86    pub async fn handshake<'a>(
87        &'a self,
88        peer_addr: SocketAddr,
89        stream: &'a mut TcpStream,
90        peer_side: ConnectionSide,
91        genesis_header: Header<N>,
92        restrictions_id: Field<N>,
93    ) -> Result<ChallengeRequest<N>, ConnectError> {
94        // If this is an inbound connection, we log it, but don't know the listening address yet.
95        // Otherwise, we can immediately register the listening address.
96        let mut listener_addr = if peer_side == ConnectionSide::Initiator {
97            debug!("Received a connection request from '{peer_addr}'");
98            None
99        } else {
100            debug!("Shaking hands with '{peer_addr}'...");
101            Some(peer_addr)
102        };
103
104        // Check (or impose) IP-level bans.
105        #[cfg(not(feature = "test"))]
106        if !self.is_dev() && peer_side == ConnectionSide::Initiator {
107            // If the IP is already banned reject the connection.
108            if self.is_ip_banned(peer_addr.ip()) {
109                trace!("Rejected a connection request from banned IP '{}'", peer_addr.ip());
110                return Err(ConnectError::other(anyhow!("'{}' is a banned IP address", peer_addr.ip())));
111            }
112
113            let num_attempts =
114                self.cache.insert_inbound_connection(peer_addr.ip(), Router::<N>::CONNECTION_ATTEMPTS_SINCE_SECS);
115
116            debug!("Number of connection attempts from '{}': {}", peer_addr.ip(), num_attempts);
117            if num_attempts > Router::<N>::MAX_CONNECTION_ATTEMPTS {
118                self.update_ip_ban(peer_addr.ip());
119                trace!("Rejected a consecutive connection request from IP '{}'", peer_addr.ip());
120                return Err(ConnectError::other(anyhow!("'{}' appears to be spamming connections", peer_addr.ip())));
121            }
122        }
123
124        // Perform the handshake; we pass on a mutable reference to listener_addr in case the process is broken at any point in time.
125        let handshake_result = match peer_side {
126            ConnectionSide::Responder => {
127                self.handshake_inner_initiator(peer_addr, stream, genesis_header, restrictions_id).await
128            }
129            ConnectionSide::Initiator => {
130                self.handshake_inner_responder(peer_addr, &mut listener_addr, stream, genesis_header, restrictions_id)
131                    .await
132            }
133        };
134
135        if let Some(addr) = listener_addr {
136            match handshake_result {
137                Ok(ref cr) => {
138                    if let Some(peer) = self.peer_pool.write().get_mut(&addr) {
139                        self.resolver.write().insert_peer(peer.listener_addr(), peer_addr, Some(cr.address));
140                        peer.upgrade_to_connected(
141                            peer_addr,
142                            cr.listener_port,
143                            cr.address,
144                            cr.node_type,
145                            cr.version,
146                            cr.snarkos_sha,
147                            ConnectionMode::Router,
148                        );
149                    }
150
151                    #[cfg(feature = "metrics")]
152                    self.update_metrics();
153                }
154                Err(_) => {
155                    if let Some(peer) = self.peer_pool.write().get_mut(&addr) {
156                        // The peer may only be downgraded if it's a ConnectingPeer.
157                        if peer.is_connecting() {
158                            peer.downgrade_to_candidate(addr);
159                        }
160                    }
161                }
162            }
163        }
164
165        handshake_result
166    }
167
168    /// The connection initiator side of the handshake.
169    async fn handshake_inner_initiator<'a>(
170        &'a self,
171        peer_addr: SocketAddr,
172        stream: &'a mut TcpStream,
173        genesis_header: Header<N>,
174        restrictions_id: Field<N>,
175    ) -> Result<ChallengeRequest<N>, ConnectError> {
176        // Introduce the peer into the peer pool.
177        // If we are connecting, the peer and listener address are identical.
178        self.add_connecting_peer(peer_addr)?;
179
180        // Construct the stream.
181        let mut framed = Framed::new(stream, MessageCodec::<N>::handshake());
182
183        // Initialize an RNG.
184        let rng = &mut OsRng;
185
186        // Determine the snarkOS SHA to send to the peer.
187        let current_block_height = self.ledger.latest_block_height();
188        let consensus_version = N::CONSENSUS_VERSION(current_block_height).unwrap();
189        let snarkos_sha = match (consensus_version >= ConsensusVersion::V12, get_repo_commit_hash()) {
190            (true, Some(sha)) => Some(sha),
191            _ => None,
192        };
193
194        /* Step 1: Send the challenge request. */
195
196        // Sample a random nonce.
197        let our_nonce = rng.r#gen();
198        // Send a challenge request to the peer.
199        let our_request =
200            ChallengeRequest::new(self.local_ip().port(), self.node_type, self.address(), our_nonce, snarkos_sha);
201        send(&mut framed, peer_addr, Message::ChallengeRequest(our_request)).await?;
202
203        /* Step 2: Receive the peer's challenge response followed by the challenge request. */
204
205        // Listen for the challenge response message.
206        let peer_response = expect_message!(Message::ChallengeResponse, framed, peer_addr);
207        // Listen for the challenge request message.
208        let peer_request = expect_message!(Message::ChallengeRequest, framed, peer_addr);
209
210        // Verify the challenge response. If a disconnect reason was returned, send the disconnect message and abort.
211        if let Some(reason) = self
212            .verify_challenge_response(
213                peer_addr,
214                peer_request.address,
215                peer_request.node_type,
216                peer_response,
217                genesis_header,
218                restrictions_id,
219                our_nonce,
220            )
221            .await
222        {
223            send(&mut framed, peer_addr, reason.into()).await?;
224            return Err(reason.into_connect_error(peer_addr));
225        }
226
227        // Verify the challenge request. If a disconnect reason was returned, send the disconnect message and abort.
228        if let Some(reason) = self.verify_challenge_request(peer_addr, &peer_request) {
229            send(&mut framed, peer_addr, reason.into()).await?;
230            return Err(reason.into_connect_error(peer_addr));
231        }
232
233        /* Step 3: Send the challenge response. */
234
235        let response_nonce: u64 = rng.r#gen();
236        let data = [peer_request.nonce.to_le_bytes(), response_nonce.to_le_bytes()].concat();
237        // Sign the counterparty nonce.
238        let Ok(our_signature) = self.account.sign_bytes(&data, rng) else {
239            return Err(ConnectError::other(anyhow!("Failed to sign the challenge request nonce")));
240        };
241        // Send the challenge response.
242        let our_response = ChallengeResponse {
243            genesis_header,
244            restrictions_id,
245            signature: Data::Object(our_signature),
246            nonce: response_nonce,
247        };
248        send(&mut framed, peer_addr, Message::ChallengeResponse(our_response)).await?;
249
250        Ok(peer_request)
251    }
252
253    /// The connection responder side of the handshake.
254    async fn handshake_inner_responder<'a>(
255        &'a self,
256        peer_addr: SocketAddr,
257        listener_addr: &mut Option<SocketAddr>,
258        stream: &'a mut TcpStream,
259        genesis_header: Header<N>,
260        restrictions_id: Field<N>,
261    ) -> Result<ChallengeRequest<N>, ConnectError> {
262        // Construct the stream.
263        let mut framed = Framed::new(stream, MessageCodec::<N>::handshake());
264
265        /* Step 1: Receive the challenge request. */
266
267        // Wait for the challenge request message.
268        let peer_request = expect_message!(Message::ChallengeRequest, framed, peer_addr);
269
270        // Determine the snarkOS SHA to send to the peer.
271        let current_block_height = self.ledger.latest_block_height();
272        let consensus_version = N::CONSENSUS_VERSION(current_block_height).unwrap();
273        let snarkos_sha = match (consensus_version >= ConsensusVersion::V12, get_repo_commit_hash()) {
274            (true, Some(sha)) => Some(sha),
275            _ => None,
276        };
277
278        // Obtain the peer's listening address.
279        *listener_addr = Some(SocketAddr::new(peer_addr.ip(), peer_request.listener_port));
280        let listener_addr = listener_addr.unwrap();
281
282        // Knowing the peer's listening address, ensure it is allowed to connect.
283        if let Err(reason) = self.ensure_peer_is_allowed(listener_addr) {
284            send(&mut framed, peer_addr, reason.into()).await?;
285            return Err(reason.into_connect_error(listener_addr));
286        }
287
288        // Introduce the peer into the peer pool.
289        self.add_connecting_peer(listener_addr)?;
290
291        // Verify the challenge request. If a disconnect reason was returned, send the disconnect message and abort.
292        if let Some(reason) = self.verify_challenge_request(peer_addr, &peer_request) {
293            send(&mut framed, peer_addr, reason.into()).await?;
294            return Err(reason.into_connect_error(peer_addr));
295        }
296
297        /* Step 2: Send the challenge response followed by own challenge request. */
298
299        // Initialize an RNG.
300        let rng = &mut OsRng;
301
302        // Sign the counterparty nonce.
303        let response_nonce: u64 = rng.r#gen();
304        let data = [peer_request.nonce.to_le_bytes(), response_nonce.to_le_bytes()].concat();
305        let Ok(our_signature) = self.account.sign_bytes(&data, rng) else {
306            return Err(ConnectError::Other(
307                anyhow!("Failed to sign the challenge request nonce from '{peer_addr}'").into(),
308            ));
309        };
310        // Send the challenge response.
311        let our_response = ChallengeResponse {
312            genesis_header,
313            restrictions_id,
314            signature: Data::Object(our_signature),
315            nonce: response_nonce,
316        };
317        send(&mut framed, peer_addr, Message::ChallengeResponse(our_response)).await?;
318
319        // Sample a random nonce.
320        let our_nonce = rng.r#gen();
321        // Send the challenge request.
322        let our_request =
323            ChallengeRequest::new(self.local_ip().port(), self.node_type, self.address(), our_nonce, snarkos_sha);
324        send(&mut framed, peer_addr, Message::ChallengeRequest(our_request)).await?;
325
326        /* Step 3: Receive the challenge response. */
327
328        // Wait for the challenge response message.
329        let peer_response = expect_message!(Message::ChallengeResponse, framed, peer_addr);
330
331        // Verify the challenge response. If a disconnect reason was returned, send the disconnect message and abort.
332        if let Some(reason) = self
333            .verify_challenge_response(
334                peer_addr,
335                peer_request.address,
336                peer_request.node_type,
337                peer_response,
338                genesis_header,
339                restrictions_id,
340                our_nonce,
341            )
342            .await
343        {
344            send(&mut framed, peer_addr, reason.into()).await?;
345            Err(reason.into_connect_error(peer_addr))
346        } else {
347            Ok(peer_request)
348        }
349    }
350
351    /// Ensure the peer is allowed to connect.
352    fn ensure_peer_is_allowed(&self, listener_addr: SocketAddr) -> Result<(), DisconnectReason> {
353        // Ensure that it's not a self-connect attempt.
354        if self.is_local_ip(listener_addr) {
355            return Err(DisconnectReason::SelfConnect);
356        }
357        // As a validator, only accept connections from trusted peers and bootstrap nodes.
358        if self.node_type() == NodeType::Validator
359            && !self.is_trusted(listener_addr)
360            && !crate::bootstrap_peers::<N>(self.is_dev()).contains(&listener_addr)
361        {
362            return Err(DisconnectReason::NoExternalPeersAllowed);
363        }
364        // If the node is in trusted peers only mode, ensure the peer is explicitly trusted.
365        if self.trusted_peers_only() && !self.is_trusted(listener_addr) {
366            return Err(DisconnectReason::NoExternalPeersAllowed);
367        }
368
369        Ok(())
370    }
371
372    /// Verifies the given challenge request. Returns a disconnect reason if the request is invalid.
373    fn verify_challenge_request(
374        &self,
375        peer_addr: SocketAddr,
376        message: &ChallengeRequest<N>,
377    ) -> Option<DisconnectReason> {
378        // Retrieve the components of the challenge request.
379        let &ChallengeRequest { version, listener_port: _, node_type, address, nonce: _, ref snarkos_sha } = message;
380        log_repo_sha_comparison(peer_addr, snarkos_sha, Self::OWNER);
381
382        // Ensure the message protocol version is not outdated.
383        if !self.is_valid_message_version(version) {
384            warn!("Dropping '{peer_addr}' on version {version} (outdated)");
385            return Some(DisconnectReason::OutdatedClientVersion);
386        }
387
388        // Ensure there are no validators connected with the given Aleo address.
389        if self.node_type() == NodeType::Validator
390            && node_type == NodeType::Validator
391            && self.is_connected_address(address)
392        {
393            warn!("Dropping '{peer_addr}' for being already connected ({address})");
394            return Some(DisconnectReason::NoReasonGiven);
395        }
396
397        None
398    }
399
400    /// Verifies the given challenge response. Returns a disconnect reason if the response is invalid.
401    #[allow(clippy::too_many_arguments)]
402    async fn verify_challenge_response(
403        &self,
404        peer_addr: SocketAddr,
405        peer_address: Address<N>,
406        peer_node_type: NodeType,
407        response: ChallengeResponse<N>,
408        expected_genesis_header: Header<N>,
409        expected_restrictions_id: Field<N>,
410        expected_nonce: u64,
411    ) -> Option<DisconnectReason> {
412        // Retrieve the components of the challenge response.
413        let ChallengeResponse { genesis_header, restrictions_id, signature, nonce } = response;
414
415        // Verify the challenge response, by checking that the block header matches.
416        if genesis_header != expected_genesis_header {
417            warn!("Handshake with '{peer_addr}' failed (incorrect block header)");
418            return Some(DisconnectReason::InvalidChallengeResponse);
419        }
420        // Verify the restrictions ID.
421        if !peer_node_type.is_prover() && !self.node_type.is_prover() && restrictions_id != expected_restrictions_id {
422            warn!("Handshake with '{peer_addr}' failed (incorrect restrictions ID)");
423            return Some(DisconnectReason::InvalidChallengeResponse);
424        }
425        // Perform the deferred non-blocking deserialization of the signature.
426        let Ok(signature) = signature.deserialize().await else {
427            warn!("Handshake with '{peer_addr}' failed (cannot deserialize the signature)");
428            return Some(DisconnectReason::InvalidChallengeResponse);
429        };
430        // Verify the signature.
431        if !signature.verify_bytes(&peer_address, &[expected_nonce.to_le_bytes(), nonce.to_le_bytes()].concat()) {
432            warn!("Handshake with '{peer_addr}' failed (invalid signature)");
433            return Some(DisconnectReason::InvalidChallengeResponse);
434        }
435        None
436    }
437}