Skip to main content

snarkos_node/bootstrap_client/
mod.rs

1// Copyright (c) 2019-2025 Provable Inc.
2// This file is part of the snarkOS library.
3
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at:
7
8// http://www.apache.org/licenses/LICENSE-2.0
9
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16mod codec;
17mod handshake;
18mod network;
19
20use crate::tcp::{self, Tcp};
21use snarkos_account::Account;
22use snarkos_node_network::{ConnectionMode, Peer, Resolver};
23use snarkos_node_tcp::{P2P, protocols::*};
24use snarkos_utilities::SignalHandler;
25use snarkvm::{
26    ledger::committee::Committee,
27    prelude::{Address, Field, Header, Network, PrivateKey, ViewKey},
28    synthesizer::Restrictions,
29};
30
31#[cfg(feature = "locktick")]
32use locktick::{
33    parking_lot::{Mutex, RwLock},
34    tokio::Mutex as TMutex,
35};
36#[cfg(not(feature = "locktick"))]
37use parking_lot::{Mutex, RwLock};
38use std::{
39    collections::{HashMap, HashSet},
40    net::SocketAddr,
41    ops::Deref,
42    str::FromStr,
43    sync::Arc,
44    time::{Duration, Instant},
45};
46#[cfg(not(feature = "locktick"))]
47use tokio::sync::Mutex as TMutex;
48use tokio::sync::oneshot;
49
50#[derive(Clone)]
51pub struct BootstrapClient<N: Network>(Arc<InnerBootstrapClient<N>>);
52
53impl<N: Network> Deref for BootstrapClient<N> {
54    type Target = Arc<InnerBootstrapClient<N>>;
55
56    fn deref(&self) -> &Self::Target {
57        &self.0
58    }
59}
60
61// A tuple holding the validator's Aleo address, and its connection mode.
62type KnownValidatorInfo<N> = (Address<N>, ConnectionMode);
63
64pub struct InnerBootstrapClient<N: Network> {
65    tcp: Tcp,
66    peer_pool: RwLock<HashMap<SocketAddr, Peer<N>>>,
67    known_validators: RwLock<HashMap<SocketAddr, KnownValidatorInfo<N>>>,
68    resolver: RwLock<Resolver<N>>,
69    account: Account<N>,
70    genesis_header: Header<N>,
71    restrictions_id: Field<N>,
72    http_client: reqwest::Client,
73    latest_committee: TMutex<(HashSet<Address<N>>, Instant)>,
74    dev: Option<u16>,
75    shutdown_tx: Mutex<Option<oneshot::Sender<()>>>,
76}
77
78impl<N: Network> BootstrapClient<N> {
79    // The interval for validator committee refreshes.
80    const COMMITTEE_REFRESH_TIME: Duration = Duration::from_secs(20);
81    // The maximum amount of time per connection.
82    const CONNECTION_LIFETIME: Duration = Duration::from_secs(15);
83    // The maximum number of connected peers.
84    const MAX_PEERS: u16 = 1_000;
85
86    pub async fn new(
87        listener_addr: SocketAddr,
88        account: Account<N>,
89        genesis_header: Header<N>,
90        dev: Option<u16>,
91    ) -> anyhow::Result<Self> {
92        // Initialize the TCP stack.
93        let tcp = Tcp::new(tcp::Config::new(listener_addr, Self::MAX_PEERS));
94        // Initialize the peer pool.
95        let peer_pool = Default::default();
96        // Initialize a collection of validators.
97        let known_validators = Default::default();
98        // Load the restrictions ID.
99        let restrictions_id = Restrictions::load()?.restrictions_id();
100        // Create a resolver.
101        let resolver = Default::default();
102        // Create an HTTP client to obtain the current committee.
103        let http_client = reqwest::Client::new();
104        // Prepare a placeholder committee, ensuring that it's insta-outdated.
105        let latest_committee = TMutex::new((Default::default(), Instant::now() - Self::COMMITTEE_REFRESH_TIME));
106
107        // Prepare the shutdown channel.
108        let (shutdown_tx, shutdown_rx) = oneshot::channel();
109        let shutdown_tx = Mutex::new(Some(shutdown_tx));
110
111        // Construct and return the bootstrap client.
112        let inner = InnerBootstrapClient {
113            tcp,
114            peer_pool,
115            known_validators,
116            resolver,
117            account,
118            genesis_header,
119            restrictions_id,
120            http_client,
121            latest_committee,
122            dev,
123            shutdown_tx,
124        };
125        let node = BootstrapClient(Arc::new(inner));
126
127        // Enable the TCP protocols.
128        node.enable_handshake().await;
129        node.enable_reading().await;
130        node.enable_writing().await;
131        node.enable_disconnect().await;
132        node.enable_on_connect().await;
133        // Enable the TCP listener. Note: This must be called after the above protocols.
134        node.tcp().enable_listener().await.expect("Failed to enable the TCP listener");
135
136        // Await the shutdown signal.
137        let _ = shutdown_rx.await;
138
139        Ok(node)
140    }
141
142    /// Returns the account address of the node.
143    pub fn address(&self) -> Address<N> {
144        self.account.address()
145    }
146
147    /// Returns the account private key of the node.
148    pub fn private_key(&self) -> &PrivateKey<N> {
149        self.account.private_key()
150    }
151
152    /// Returns the account view key of the node.
153    pub fn view_key(&self) -> &ViewKey<N> {
154        self.account.view_key()
155    }
156
157    /// Returns the listener IP address from the connected peer address.
158    pub fn resolve_to_listener(&self, connected_addr: SocketAddr) -> Option<SocketAddr> {
159        self.resolver.read().get_listener(connected_addr)
160    }
161
162    /// Returns `true` if the node is in development mode.
163    pub fn is_dev(&self) -> bool {
164        self.dev.is_some()
165    }
166
167    /// Returns the current validator committee or updates it from the explorer, if
168    /// we are capable of obtaining it from the network.
169    pub async fn get_or_update_committee(&self) -> anyhow::Result<Option<HashSet<Address<N>>>> {
170        // Development testing may include a list of committee Aleo addresses loaded from the environment.
171        if cfg!(feature = "test") || self.is_dev() {
172            match std::env::var("TEST_COMMITTEE_ADDRS") {
173                Ok(aleo_addrs) => {
174                    let dev_committee =
175                        aleo_addrs.split(',').map(|addr| Address::<N>::from_str(addr).unwrap()).collect();
176                    return Ok(Some(dev_committee));
177                }
178                Err(err) => {
179                    warn!("Failed to load committee peers from environment: {err}");
180                    return Ok(None);
181                }
182            }
183        }
184
185        let now = Instant::now();
186        let (committee, timestamp) = &mut *self.latest_committee.lock().await;
187        if now - *timestamp >= Self::COMMITTEE_REFRESH_TIME {
188            debug!("Updating the validator committee");
189            *timestamp = now;
190            let committe_query_addr =
191                format!("https://api.explorer.provable.com/v2/{}/committee/latest", N::SHORT_NAME);
192            let response = self.http_client.get(committe_query_addr).send().await?;
193            debug!("Received response from the explorer: {:?}", response);
194            let json = response.text().await?;
195            let full_committee = Committee::from_str(&json)?;
196            *committee = full_committee.members().keys().copied().collect();
197            debug!("The validator committee has {} members now", committee.len());
198
199            Ok(Some(committee.clone()))
200        } else {
201            Ok(Some(committee.clone()))
202        }
203    }
204
205    // Return the known addresses of current committee members, or all known
206    // validators if the committee info is unavailable.
207    pub async fn get_validator_addrs(&self) -> HashMap<SocketAddr, KnownValidatorInfo<N>> {
208        // First, collect info on all the validators we had connected to before.
209        let mut known_validators = self.known_validators.read().clone();
210        // If the committee info is available, prune non-committee members.
211        match self.get_or_update_committee().await {
212            Ok(Some(committee)) => {
213                known_validators.retain(|_, (aleo_addr, _)| committee.contains(aleo_addr));
214                known_validators
215            }
216            Ok(None) => known_validators,
217            Err(error) => {
218                error!("Couldn't update the validator committee: {error}");
219                known_validators
220            }
221        }
222    }
223
224    /// Shuts down the bootstrap client.
225    pub async fn shut_down(&self) {
226        info!("Shutting down the bootstrap client...");
227
228        // Shut down the low-level network features.
229        self.tcp.shut_down().await;
230
231        // Shut down the node.
232        if let Some(shutdown_tx) = self.shutdown_tx.lock().take() {
233            let _ = shutdown_tx.send(());
234        }
235    }
236
237    /// Blocks until a shutdown signal was received or manual shutdown was triggered.
238    pub async fn wait_for_signals(&self, handler: &SignalHandler) {
239        handler.wait_for_signals().await;
240
241        // If the node is already initialized, then shut it down.
242        self.shut_down().await;
243    }
244}