Skip to main content

snarkos_node/bootstrap_client/
network.rs

1// Copyright (c) 2019-2025 Provable Inc.
2// This file is part of the snarkOS library.
3
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at:
7
8// http://www.apache.org/licenses/LICENSE-2.0
9
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16use crate::{
17    BootstrapClient,
18    bft::{
19        MAX_VALIDATORS_TO_SEND,
20        events::{self, Event},
21    },
22    bootstrap_client::codec::BootstrapClientCodec,
23    network::{ConnectionMode, NodeType, Peer, PeerPoolHandling, Resolver},
24    router::{
25        MAX_PEERS_TO_SEND,
26        messages::{self, Message},
27    },
28    tcp::{ConnectionSide, P2P, Tcp, protocols::*},
29};
30use snarkvm::prelude::Network;
31
32use indexmap::IndexMap;
33#[cfg(feature = "locktick")]
34use locktick::parking_lot::RwLock;
35#[cfg(not(feature = "locktick"))]
36use parking_lot::RwLock;
37use std::{collections::HashMap, io, net::SocketAddr};
38use tokio::time::sleep;
39use tokio_util::codec::Decoder;
40
41impl<N: Network> P2P for BootstrapClient<N> {
42    fn tcp(&self) -> &Tcp {
43        &self.tcp
44    }
45}
46
47impl<N: Network> PeerPoolHandling<N> for BootstrapClient<N> {
48    const MAXIMUM_POOL_SIZE: usize = 10_000;
49    const OWNER: &'static str = "[Network]";
50    const PEER_SLASHING_COUNT: usize = 200;
51
52    fn is_dev(&self) -> bool {
53        self.dev.is_some()
54    }
55
56    fn trusted_peers_only(&self) -> bool {
57        false
58    }
59
60    fn node_type(&self) -> NodeType {
61        NodeType::BootstrapClient
62    }
63
64    fn peer_pool(&self) -> &RwLock<HashMap<SocketAddr, Peer<N>>> {
65        &self.peer_pool
66    }
67
68    fn resolver(&self) -> &RwLock<Resolver<N>> {
69        &self.resolver
70    }
71}
72
73/// The bootstrap client can handle both validator and non-validator messages.
74#[derive(Debug)]
75pub enum MessageOrEvent<N: Network> {
76    Message(Message<N>),
77    Event(Event<N>),
78}
79
80#[async_trait]
81impl<N: Network> OnConnect for BootstrapClient<N> {
82    async fn on_connect(&self, peer_addr: SocketAddr) {
83        // If the peer is connected in validator (Gateway) mode, save it to the collection
84        // of known validators.
85        if let Some(listener_addr) = self.resolve_to_listener(peer_addr) {
86            if let Some(peer) = self.get_connected_peer(listener_addr) {
87                if peer.node_type == NodeType::Validator {
88                    self.known_validators.write().insert(listener_addr, (peer.aleo_addr, peer.connection_mode));
89                }
90            }
91        }
92        // The peers should only ask us for the peer list; spawn a task that will
93        // terminate the connection after a while.
94        let tcp = self.tcp().clone();
95        tokio::spawn(async move {
96            sleep(Self::CONNECTION_LIFETIME).await;
97            tcp.disconnect(peer_addr).await;
98        });
99    }
100}
101
102#[async_trait]
103impl<N: Network> Disconnect for BootstrapClient<N> {
104    /// Any extra operations to be performed during a disconnect.
105    async fn handle_disconnect(&self, peer_addr: SocketAddr) {
106        if let Some(listener_addr) = self.resolve_to_listener(peer_addr) {
107            self.downgrade_peer_to_candidate(listener_addr);
108        }
109    }
110}
111
112#[async_trait]
113impl<N: Network> Reading for BootstrapClient<N> {
114    type Codec = BootstrapClientCodec<N>;
115    type Message = <BootstrapClientCodec<N> as Decoder>::Item;
116
117    /// Creates a [`Decoder`] used to interpret messages from the network.
118    /// The `side` param indicates the connection side **from the node's perspective**.
119    fn codec(&self, _peer_addr: SocketAddr, _side: ConnectionSide) -> Self::Codec {
120        Default::default()
121    }
122
123    /// Processes a message received from the network.
124    async fn process_message(&self, peer_addr: SocketAddr, message: Self::Message) -> io::Result<()> {
125        // Identify the connected peer.
126        let Some(listener_addr) = self.resolve_to_listener(peer_addr) else {
127            // Already disconnecting, ignore.
128            return Ok(());
129        };
130
131        // Handle the right peer request.
132        match message {
133            MessageOrEvent::Message(Message::PeerRequest(_)) => {
134                debug!("Received a PeerRequest from '{listener_addr}'");
135                let mut peers = self.get_candidate_peers();
136
137                // In order to filter out validators properly, we'll need the
138                // peer's node type and the list of validators.
139                let Some(peer) = self.get_connected_peer(listener_addr) else {
140                    return Ok(());
141                };
142                let validators = self.get_validator_addrs().await;
143
144                if peer.node_type == NodeType::Validator {
145                    // Filter out Gateway addresses.
146                    peers.retain(|peer| {
147                        validators
148                            .get(&peer.listener_addr)
149                            .map(|(_, connection_mode)| *connection_mode != ConnectionMode::Gateway)
150                            .unwrap_or(true)
151                    });
152                } else {
153                    // Filter out all validator addresses.
154                    peers.retain(|peer| !validators.contains_key(&peer.listener_addr));
155                }
156                peers.truncate(MAX_PEERS_TO_SEND);
157                let peers = peers.into_iter().map(|peer| (peer.listener_addr, None)).collect::<Vec<_>>();
158
159                debug!("Sending {} peer address(es) to '{listener_addr}'", peers.len());
160                let msg = MessageOrEvent::Message(Message::PeerResponse(messages::PeerResponse { peers }));
161                if let Err(err) = self.unicast(peer_addr, msg)?.await {
162                    warn!("Couldn't deliver a peer list to '{listener_addr}': {err}; disconnecting");
163                } else {
164                    debug!("Disconnecting from '{listener_addr}' - peers provided");
165                }
166
167                self.tcp().disconnect(peer_addr).await;
168            }
169            MessageOrEvent::Event(Event::ValidatorsRequest(_)) => {
170                debug!("Received a ValidatorsRequest from '{listener_addr}'");
171
172                // Procure a list of applicable validator addresses.
173                let validators = self.get_validator_addrs().await;
174                let validators = validators
175                    .into_iter()
176                    .filter_map(|(listener_addr, (aleo_addr, connection_mode))| {
177                        // Only pick addresses connected in Gateway mode.
178                        (connection_mode == ConnectionMode::Gateway).then_some((listener_addr, aleo_addr))
179                    })
180                    .take(MAX_VALIDATORS_TO_SEND)
181                    .collect::<IndexMap<_, _>>();
182
183                debug!("Sending {} validator address(es) to '{listener_addr}'", validators.len());
184                let msg = MessageOrEvent::Event(Event::ValidatorsResponse(events::ValidatorsResponse { validators }));
185                if let Err(err) = self.unicast(peer_addr, msg)?.await {
186                    warn!("Couldn't deliver a peer list to '{listener_addr}': {err}; disconnecting");
187                } else {
188                    debug!("Disconnecting from '{listener_addr}' - peers provided");
189                }
190
191                self.tcp().disconnect(peer_addr).await;
192            }
193            msg => {
194                let name = match msg {
195                    MessageOrEvent::Message(msg) => msg.name(),
196                    MessageOrEvent::Event(msg) => msg.name(),
197                };
198                trace!("Ignoring an unhandled message ({name}) from {listener_addr}");
199            }
200        }
201
202        Ok(())
203    }
204}
205
206#[async_trait]
207impl<N: Network> Writing for BootstrapClient<N> {
208    type Codec = BootstrapClientCodec<N>;
209    type Message = MessageOrEvent<N>;
210
211    /// Creates an [`Encoder`] used to write the outbound messages to the target stream.
212    /// The `side` parameter indicates the connection side **from the node's perspective**.
213    fn codec(&self, _addr: SocketAddr, _side: ConnectionSide) -> Self::Codec {
214        Default::default()
215    }
216}