snarkos_node/bootstrap_client/
network.rs

1// Copyright (c) 2019-2025 Provable Inc.
2// This file is part of the snarkOS library.
3
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at:
7
8// http://www.apache.org/licenses/LICENSE-2.0
9
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16use crate::{
17    BootstrapClient,
18    bft::{
19        MAX_VALIDATORS_TO_SEND,
20        events::{self, Event},
21    },
22    bootstrap_client::codec::BootstrapClientCodec,
23    router::{
24        MAX_PEERS_TO_SEND,
25        Peer,
26        PeerPoolHandling,
27        Resolver,
28        messages::{self, Message},
29    },
30    tcp::{ConnectionSide, P2P, Tcp, protocols::*},
31};
32use snarkvm::prelude::Network;
33
34use indexmap::IndexMap;
35#[cfg(feature = "locktick")]
36use locktick::parking_lot::RwLock;
37#[cfg(not(feature = "locktick"))]
38use parking_lot::RwLock;
39use std::{collections::HashMap, io, net::SocketAddr};
40use tokio::time::sleep;
41use tokio_util::codec::Decoder;
42
43impl<N: Network> P2P for BootstrapClient<N> {
44    fn tcp(&self) -> &Tcp {
45        &self.tcp
46    }
47}
48
49impl<N: Network> PeerPoolHandling<N> for BootstrapClient<N> {
50    const MAXIMUM_POOL_SIZE: usize = 10_000;
51    const OWNER: &'static str = "[Network]";
52    const PEER_SLASHING_COUNT: usize = 200;
53
54    fn is_dev(&self) -> bool {
55        self.dev.is_some()
56    }
57
58    fn peer_pool(&self) -> &RwLock<HashMap<SocketAddr, Peer<N>>> {
59        &self.peer_pool
60    }
61
62    fn resolver(&self) -> &RwLock<Resolver<N>> {
63        &self.resolver
64    }
65}
66
67/// The bootstrap client can handle both validator and non-validator messages.
68#[derive(Debug)]
69pub enum MessageOrEvent<N: Network> {
70    Message(Message<N>),
71    Event(Event<N>),
72}
73
74#[async_trait]
75impl<N: Network> OnConnect for BootstrapClient<N> {
76    async fn on_connect(&self, peer_addr: SocketAddr) {
77        // If the peer is connected in validator (Gateway) mode, save it to the collection
78        // of known validators.
79        if let Some(listener_addr) = self.resolve_to_listener(peer_addr) {
80            if let Some(peer) = self.get_connected_peer(listener_addr) {
81                if self.resolver().read().get_peer_ip_for_address(peer.aleo_addr).is_some() {
82                    self.known_validators.write().insert(listener_addr, peer.aleo_addr);
83                }
84            }
85        }
86        // The peers should only ask us for the peer list; spawn a task that will
87        // terminate the connection after a while.
88        let tcp = self.tcp().clone();
89        tokio::spawn(async move {
90            sleep(Self::CONNECTION_LIFETIME).await;
91            tcp.disconnect(peer_addr).await;
92        });
93    }
94}
95
96#[async_trait]
97impl<N: Network> Disconnect for BootstrapClient<N> {
98    /// Any extra operations to be performed during a disconnect.
99    async fn handle_disconnect(&self, peer_addr: SocketAddr) {
100        if let Some(listener_addr) = self.resolve_to_listener(peer_addr) {
101            self.downgrade_peer_to_candidate(listener_addr);
102        }
103    }
104}
105
106#[async_trait]
107impl<N: Network> Reading for BootstrapClient<N> {
108    type Codec = BootstrapClientCodec<N>;
109    type Message = <BootstrapClientCodec<N> as Decoder>::Item;
110
111    /// Creates a [`Decoder`] used to interpret messages from the network.
112    /// The `side` param indicates the connection side **from the node's perspective**.
113    fn codec(&self, _peer_addr: SocketAddr, _side: ConnectionSide) -> Self::Codec {
114        Default::default()
115    }
116
117    /// Processes a message received from the network.
118    async fn process_message(&self, peer_addr: SocketAddr, message: Self::Message) -> io::Result<()> {
119        // Identify the connected peer.
120        let Some(listener_addr) = self.resolve_to_listener(peer_addr) else {
121            // Already disconnecting, ignore.
122            return Ok(());
123        };
124
125        // Handle the right peer request.
126        match message {
127            MessageOrEvent::Message(Message::PeerRequest(_)) => {
128                debug!("Received a PeerRequest from '{listener_addr}'");
129                let mut peers = self.get_candidate_peers();
130
131                // Filter out peers who had been connected via the Gateway.
132                let known_validators = self.get_known_validators();
133                peers.retain(|peer| !known_validators.contains_key(&peer.listener_addr));
134                peers.truncate(MAX_PEERS_TO_SEND);
135                let peers = peers.into_iter().map(|peer| (peer.listener_addr, None)).collect::<Vec<_>>();
136
137                debug!("Sending {} peer address(es) to '{listener_addr}'", peers.len());
138                let msg = MessageOrEvent::Message(Message::PeerResponse(messages::PeerResponse { peers }));
139                if let Err(err) = self.unicast(peer_addr, msg)?.await {
140                    warn!("Couldn't deliver a peer list to '{listener_addr}': {err}; disconnecting");
141                } else {
142                    debug!("Disconnecting from '{listener_addr}' - peers provided");
143                }
144
145                self.tcp().disconnect(peer_addr).await;
146            }
147            MessageOrEvent::Event(Event::ValidatorsRequest(_)) => {
148                debug!("Received a ValidatorsRequest from '{listener_addr}'");
149                let current_committee = match self.get_or_update_committee().await {
150                    Ok(new_committee) => new_committee,
151                    Err(error) => {
152                        error!("Couldn't update the validator committee: {error}");
153                        None
154                    }
155                };
156
157                // Return the known addresses of current committee members, or all known
158                // validators if the committee info is unavailable.
159                let mut known_validators = self.get_known_validators();
160                if let Some(committee) = &current_committee {
161                    known_validators.retain(|_, aleo_addr| committee.contains(aleo_addr));
162                }
163                let validators = known_validators.into_iter().take(MAX_VALIDATORS_TO_SEND).collect::<IndexMap<_, _>>();
164
165                debug!("Sending {} validator address(es) to '{listener_addr}'", validators.len());
166                let msg = MessageOrEvent::Event(Event::ValidatorsResponse(events::ValidatorsResponse { validators }));
167                if let Err(err) = self.unicast(peer_addr, msg)?.await {
168                    warn!("Couldn't deliver a peer list to '{listener_addr}': {err}; disconnecting");
169                } else {
170                    debug!("Disconnecting from '{listener_addr}' - peers provided");
171                }
172
173                self.tcp().disconnect(peer_addr).await;
174            }
175            msg => {
176                let name = match msg {
177                    MessageOrEvent::Message(msg) => msg.name(),
178                    MessageOrEvent::Event(msg) => msg.name(),
179                };
180                trace!("Ignoring an unhandled message ({name}) from {listener_addr}");
181            }
182        }
183
184        Ok(())
185    }
186}
187
188#[async_trait]
189impl<N: Network> Writing for BootstrapClient<N> {
190    type Codec = BootstrapClientCodec<N>;
191    type Message = MessageOrEvent<N>;
192
193    /// Creates an [`Encoder`] used to write the outbound messages to the target stream.
194    /// The `side` parameter indicates the connection side **from the node's perspective**.
195    fn codec(&self, _addr: SocketAddr, _side: ConnectionSide) -> Self::Codec {
196        Default::default()
197    }
198}