snarkos_node/bootstrap_client/
mod.rs

1// Copyright (c) 2019-2025 Provable Inc.
2// This file is part of the snarkOS library.
3
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at:
7
8// http://www.apache.org/licenses/LICENSE-2.0
9
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16mod codec;
17mod handshake;
18mod network;
19
20use crate::tcp::{self, Tcp};
21use snarkos_account::Account;
22use snarkos_node_network::{ConnectionMode, Peer, Resolver};
23use snarkos_node_tcp::{P2P, protocols::*};
24use snarkvm::{
25    ledger::committee::Committee,
26    prelude::{Address, Field, Header, Network, PrivateKey, ViewKey},
27    synthesizer::Restrictions,
28};
29
30#[cfg(feature = "locktick")]
31use locktick::{
32    parking_lot::{Mutex, RwLock},
33    tokio::Mutex as TMutex,
34};
35#[cfg(not(feature = "locktick"))]
36use parking_lot::{Mutex, RwLock};
37use std::{
38    collections::{HashMap, HashSet},
39    net::SocketAddr,
40    ops::Deref,
41    str::FromStr,
42    sync::Arc,
43    time::{Duration, Instant},
44};
45#[cfg(not(feature = "locktick"))]
46use tokio::sync::Mutex as TMutex;
47use tokio::sync::oneshot;
48
49#[derive(Clone)]
50pub struct BootstrapClient<N: Network>(Arc<InnerBootstrapClient<N>>);
51
52impl<N: Network> Deref for BootstrapClient<N> {
53    type Target = Arc<InnerBootstrapClient<N>>;
54
55    fn deref(&self) -> &Self::Target {
56        &self.0
57    }
58}
59
60// A tuple holding the validator's Aleo address, and its connection mode.
61type KnownValidatorInfo<N> = (Address<N>, ConnectionMode);
62
63pub struct InnerBootstrapClient<N: Network> {
64    tcp: Tcp,
65    peer_pool: RwLock<HashMap<SocketAddr, Peer<N>>>,
66    known_validators: RwLock<HashMap<SocketAddr, KnownValidatorInfo<N>>>,
67    resolver: RwLock<Resolver<N>>,
68    account: Account<N>,
69    genesis_header: Header<N>,
70    restrictions_id: Field<N>,
71    http_client: reqwest::Client,
72    latest_committee: TMutex<(HashSet<Address<N>>, Instant)>,
73    dev: Option<u16>,
74    shutdown_tx: Mutex<Option<oneshot::Sender<()>>>,
75}
76
77impl<N: Network> BootstrapClient<N> {
78    // The interval for validator committee refreshes.
79    const COMMITTEE_REFRESH_TIME: Duration = Duration::from_secs(20);
80    // The maximum amount of time per connection.
81    const CONNECTION_LIFETIME: Duration = Duration::from_secs(15);
82    // The maximum number of connected peers.
83    const MAX_PEERS: u16 = 1_000;
84
85    pub async fn new(
86        listener_addr: SocketAddr,
87        account: Account<N>,
88        genesis_header: Header<N>,
89        dev: Option<u16>,
90    ) -> anyhow::Result<Self> {
91        // Initialize the TCP stack.
92        let tcp = Tcp::new(tcp::Config::new(listener_addr, Self::MAX_PEERS));
93        // Initialize the peer pool.
94        let peer_pool = Default::default();
95        // Initialize a collection of validators.
96        let known_validators = Default::default();
97        // Load the restrictions ID.
98        let restrictions_id = Restrictions::load()?.restrictions_id();
99        // Create a resolver.
100        let resolver = Default::default();
101        // Create an HTTP client to obtain the current committee.
102        let http_client = reqwest::Client::new();
103        // Prepare a placeholder committee, ensuring that it's insta-outdated.
104        let latest_committee = TMutex::new((Default::default(), Instant::now() - Self::COMMITTEE_REFRESH_TIME));
105
106        // Prepare the shutdown channel.
107        let (shutdown_tx, shutdown_rx) = oneshot::channel();
108        let shutdown_tx = Mutex::new(Some(shutdown_tx));
109
110        // Construct and return the bootstrap client.
111        let inner = InnerBootstrapClient {
112            tcp,
113            peer_pool,
114            known_validators,
115            resolver,
116            account,
117            genesis_header,
118            restrictions_id,
119            http_client,
120            latest_committee,
121            dev,
122            shutdown_tx,
123        };
124        let node = BootstrapClient(Arc::new(inner));
125
126        // Enable the TCP protocols.
127        node.enable_handshake().await;
128        node.enable_reading().await;
129        node.enable_writing().await;
130        node.enable_disconnect().await;
131        node.enable_on_connect().await;
132        // Enable the TCP listener. Note: This must be called after the above protocols.
133        node.tcp().enable_listener().await.expect("Failed to enable the TCP listener");
134
135        // Await the shutdown signal.
136        let _ = shutdown_rx.await;
137
138        Ok(node)
139    }
140
141    /// Returns the account address of the node.
142    pub fn address(&self) -> Address<N> {
143        self.account.address()
144    }
145
146    /// Returns the account private key of the node.
147    pub fn private_key(&self) -> &PrivateKey<N> {
148        self.account.private_key()
149    }
150
151    /// Returns the account view key of the node.
152    pub fn view_key(&self) -> &ViewKey<N> {
153        self.account.view_key()
154    }
155
156    /// Returns the listener IP address from the connected peer address.
157    pub fn resolve_to_listener(&self, connected_addr: SocketAddr) -> Option<SocketAddr> {
158        self.resolver.read().get_listener(connected_addr)
159    }
160
161    /// Returns `true` if the node is in development mode.
162    pub fn is_dev(&self) -> bool {
163        self.dev.is_some()
164    }
165
166    /// Returns the current validator committee or updates it from the explorer, if
167    /// we are capable of obtaining it from the network.
168    pub async fn get_or_update_committee(&self) -> anyhow::Result<Option<HashSet<Address<N>>>> {
169        // Development testing may include a list of committee Aleo addresses loaded from the environment.
170        if cfg!(feature = "test") || self.is_dev() {
171            match std::env::var("TEST_COMMITTEE_ADDRS") {
172                Ok(aleo_addrs) => {
173                    let dev_committee =
174                        aleo_addrs.split(',').map(|addr| Address::<N>::from_str(addr).unwrap()).collect();
175                    return Ok(Some(dev_committee));
176                }
177                Err(err) => {
178                    warn!("Failed to load committee peers from environment: {err}");
179                    return Ok(None);
180                }
181            }
182        }
183
184        let now = Instant::now();
185        let (committee, timestamp) = &mut *self.latest_committee.lock().await;
186        if now - *timestamp >= Self::COMMITTEE_REFRESH_TIME {
187            debug!("Updating the validator committee");
188            *timestamp = now;
189            let committe_query_addr = format!("https://api.explorer.provable.com/v2/{}/committee/latest", N::NAME);
190            let response = self.http_client.get(committe_query_addr).send().await?;
191            let json = response.text().await?;
192            let full_committee = Committee::from_str(&json)?;
193            *committee = full_committee.members().keys().copied().collect();
194            debug!("The validator committee has {} members now", committee.len());
195
196            Ok(Some(committee.clone()))
197        } else {
198            Ok(Some(committee.clone()))
199        }
200    }
201
202    // Return the known addresses of current committee members, or all known
203    // validators if the committee info is unavailable.
204    pub async fn get_validator_addrs(&self) -> HashMap<SocketAddr, KnownValidatorInfo<N>> {
205        // First, collect info on all the validators we had connected to before.
206        let mut known_validators = self.known_validators.read().clone();
207        // If the committee info is available, prune non-committee members.
208        match self.get_or_update_committee().await {
209            Ok(Some(committee)) => {
210                known_validators.retain(|_, (aleo_addr, _)| committee.contains(aleo_addr));
211                known_validators
212            }
213            Ok(None) => known_validators,
214            Err(error) => {
215                error!("Couldn't update the validator committee: {error}");
216                known_validators
217            }
218        }
219    }
220
221    /// Shuts down the bootstrap client.
222    pub async fn shut_down(&self) {
223        info!("Shutting down the bootstrap client...");
224
225        // Shut down the low-level network features.
226        self.tcp.shut_down().await;
227
228        // Shut down the node.
229        if let Some(shutdown_tx) = self.shutdown_tx.lock().take() {
230            let _ = shutdown_tx.send(());
231        }
232    }
233}