Skip to main content

snarkos_node/bootstrap_client/
network.rs

1// Copyright (c) 2019-2026 Provable Inc.
2// This file is part of the snarkOS library.
3
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at:
7
8// http://www.apache.org/licenses/LICENSE-2.0
9
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16use crate::{
17    BootstrapClient,
18    bft::{
19        MAX_VALIDATORS_TO_SEND,
20        events::{self, Event},
21    },
22    bootstrap_client::codec::BootstrapClientCodec,
23    network::{ConnectionMode, NodeType, Peer, PeerPoolHandling, Resolver},
24    router::{
25        MAX_PEERS_TO_SEND,
26        messages::{self, Message},
27    },
28    tcp::{ConnectionSide, P2P, Tcp, protocols::*},
29};
30use snarkvm::prelude::Network;
31
32use indexmap::IndexMap;
33#[cfg(feature = "locktick")]
34use locktick::parking_lot::RwLock;
35#[cfg(not(feature = "locktick"))]
36use parking_lot::RwLock;
37use std::{collections::HashMap, io, net::SocketAddr};
38use tokio::time::sleep;
39use tokio_util::codec::Decoder;
40
41impl<N: Network> P2P for BootstrapClient<N> {
42    fn tcp(&self) -> &Tcp {
43        &self.tcp
44    }
45}
46
47impl<N: Network> PeerPoolHandling<N> for BootstrapClient<N> {
48    const MAXIMUM_POOL_SIZE: usize = 10_000;
49    const OWNER: &'static str = "[Network]";
50    const PEER_SLASHING_COUNT: usize = 200;
51
52    fn is_dev(&self) -> bool {
53        self.dev.is_some()
54    }
55
56    fn trusted_peers_only(&self) -> bool {
57        false
58    }
59
60    fn node_type(&self) -> NodeType {
61        NodeType::BootstrapClient
62    }
63
64    fn peer_pool(&self) -> &RwLock<HashMap<SocketAddr, Peer<N>>> {
65        &self.peer_pool
66    }
67
68    fn resolver(&self) -> &RwLock<Resolver<N>> {
69        &self.resolver
70    }
71}
72
73/// The bootstrap client can handle both validator and non-validator messages.
74#[derive(Debug)]
75pub enum MessageOrEvent<N: Network> {
76    Message(Message<N>),
77    Event(Event<N>),
78}
79
80#[async_trait]
81impl<N: Network> OnConnect for BootstrapClient<N> {
82    async fn on_connect(&self, peer_addr: SocketAddr) {
83        // If the peer is connected in validator (Gateway) mode, save it to the collection
84        // of known validators.
85        if let Some(listener_addr) = self.resolve_to_listener(peer_addr)
86            && let Some(peer) = self.get_connected_peer(listener_addr)
87            && peer.node_type == NodeType::Validator
88        {
89            self.known_validators.write().insert(listener_addr, (peer.aleo_addr, peer.connection_mode));
90        }
91        // The peers should only ask us for the peer list; spawn a task that will
92        // terminate the connection after a while.
93        let tcp = self.tcp().clone();
94        tokio::spawn(async move {
95            sleep(Self::CONNECTION_LIFETIME).await;
96            tcp.disconnect(peer_addr).await;
97        });
98    }
99}
100
101#[async_trait]
102impl<N: Network> Disconnect for BootstrapClient<N> {
103    /// Any extra operations to be performed during a disconnect.
104    async fn handle_disconnect(&self, peer_addr: SocketAddr) {
105        if let Some(listener_addr) = self.resolve_to_listener(peer_addr) {
106            self.downgrade_peer_to_candidate(listener_addr);
107        }
108    }
109}
110
111#[async_trait]
112impl<N: Network> Reading for BootstrapClient<N> {
113    type Codec = BootstrapClientCodec<N>;
114    type Message = <BootstrapClientCodec<N> as Decoder>::Item;
115
116    /// Creates a [`Decoder`] used to interpret messages from the network.
117    /// The `side` param indicates the connection side **from the node's perspective**.
118    fn codec(&self, _peer_addr: SocketAddr, _side: ConnectionSide) -> Self::Codec {
119        Default::default()
120    }
121
122    /// Processes a message received from the network.
123    async fn process_message(&self, peer_addr: SocketAddr, message: Self::Message) -> io::Result<()> {
124        // Identify the connected peer.
125        let Some(listener_addr) = self.resolve_to_listener(peer_addr) else {
126            // Already disconnecting, ignore.
127            return Ok(());
128        };
129
130        // Handle the right peer request.
131        match message {
132            MessageOrEvent::Message(Message::PeerRequest(_)) => {
133                debug!("Received a PeerRequest from '{listener_addr}'");
134                let mut peers = self.get_candidate_peers();
135
136                // In order to filter out validators properly, we'll need the
137                // peer's node type and the list of validators.
138                let Some(peer) = self.get_connected_peer(listener_addr) else {
139                    return Ok(());
140                };
141                let validators = self.get_validator_addrs().await;
142
143                if peer.node_type == NodeType::Validator {
144                    // Filter out Gateway addresses.
145                    peers.retain(|peer| {
146                        validators
147                            .get(&peer.listener_addr)
148                            .map(|(_, connection_mode)| *connection_mode != ConnectionMode::Gateway)
149                            .unwrap_or(true)
150                    });
151                } else {
152                    // Filter out all validator addresses.
153                    peers.retain(|peer| !validators.contains_key(&peer.listener_addr));
154                }
155                peers.truncate(MAX_PEERS_TO_SEND);
156                let peers = peers.into_iter().map(|peer| (peer.listener_addr, None)).collect::<Vec<_>>();
157
158                debug!("Sending {} peer address(es) to '{listener_addr}'", peers.len());
159                let msg = MessageOrEvent::Message(Message::PeerResponse(messages::PeerResponse { peers }));
160                if let Err(err) = self.unicast(peer_addr, msg)?.await {
161                    warn!("Couldn't deliver a peer list to '{listener_addr}': {err}; disconnecting");
162                } else {
163                    debug!("Disconnecting from '{listener_addr}' - peers provided");
164                }
165
166                self.tcp().disconnect(peer_addr).await;
167            }
168            MessageOrEvent::Event(Event::ValidatorsRequest(_)) => {
169                debug!("Received a ValidatorsRequest from '{listener_addr}'");
170
171                // Procure a list of applicable validator addresses.
172                let validators = self.get_validator_addrs().await;
173                let validators = validators
174                    .into_iter()
175                    .filter_map(|(listener_addr, (aleo_addr, connection_mode))| {
176                        // Only pick addresses connected in Gateway mode.
177                        (connection_mode == ConnectionMode::Gateway).then_some((listener_addr, aleo_addr))
178                    })
179                    .take(MAX_VALIDATORS_TO_SEND)
180                    .collect::<IndexMap<_, _>>();
181
182                debug!("Sending {} validator address(es) to '{listener_addr}'", validators.len());
183                let msg = MessageOrEvent::Event(Event::ValidatorsResponse(events::ValidatorsResponse { validators }));
184                if let Err(err) = self.unicast(peer_addr, msg)?.await {
185                    warn!("Couldn't deliver a peer list to '{listener_addr}': {err}; disconnecting");
186                } else {
187                    debug!("Disconnecting from '{listener_addr}' - peers provided");
188                }
189
190                self.tcp().disconnect(peer_addr).await;
191            }
192            msg => {
193                let name = match msg {
194                    MessageOrEvent::Message(msg) => msg.name(),
195                    MessageOrEvent::Event(msg) => msg.name(),
196                };
197                trace!("Ignoring an unhandled message ({name}) from {listener_addr}");
198            }
199        }
200
201        Ok(())
202    }
203}
204
205#[async_trait]
206impl<N: Network> Writing for BootstrapClient<N> {
207    type Codec = BootstrapClientCodec<N>;
208    type Message = MessageOrEvent<N>;
209
210    /// Creates an [`Encoder`] used to write the outbound messages to the target stream.
211    /// The `side` parameter indicates the connection side **from the node's perspective**.
212    fn codec(&self, _addr: SocketAddr, _side: ConnectionSide) -> Self::Codec {
213        Default::default()
214    }
215}