snarkos_node/bootstrap_client/
mod.rs1mod codec;
17mod handshake;
18mod network;
19
20use crate::tcp::{self, Tcp};
21use snarkos_account::Account;
22use snarkos_node_network::{ConnectionMode, Peer, Resolver};
23use snarkos_node_tcp::{P2P, protocols::*};
24use snarkos_utilities::SignalHandler;
25use snarkvm::{
26 ledger::committee::Committee,
27 prelude::{Address, Field, Header, Network, PrivateKey, ViewKey},
28 synthesizer::Restrictions,
29};
30
31#[cfg(feature = "locktick")]
32use locktick::{
33 parking_lot::{Mutex, RwLock},
34 tokio::Mutex as TMutex,
35};
36#[cfg(not(feature = "locktick"))]
37use parking_lot::{Mutex, RwLock};
38use std::{
39 collections::{HashMap, HashSet},
40 net::SocketAddr,
41 ops::Deref,
42 str::FromStr,
43 sync::Arc,
44 time::{Duration, Instant},
45};
46#[cfg(not(feature = "locktick"))]
47use tokio::sync::Mutex as TMutex;
48use tokio::sync::oneshot;
49
50#[derive(Clone)]
51pub struct BootstrapClient<N: Network>(Arc<InnerBootstrapClient<N>>);
52
53impl<N: Network> Deref for BootstrapClient<N> {
54 type Target = Arc<InnerBootstrapClient<N>>;
55
56 fn deref(&self) -> &Self::Target {
57 &self.0
58 }
59}
60
61type KnownValidatorInfo<N> = (Address<N>, ConnectionMode);
63
64pub struct InnerBootstrapClient<N: Network> {
65 tcp: Tcp,
66 peer_pool: RwLock<HashMap<SocketAddr, Peer<N>>>,
67 known_validators: RwLock<HashMap<SocketAddr, KnownValidatorInfo<N>>>,
68 resolver: RwLock<Resolver<N>>,
69 account: Account<N>,
70 genesis_header: Header<N>,
71 restrictions_id: Field<N>,
72 http_client: reqwest::Client,
73 latest_committee: TMutex<(HashSet<Address<N>>, Instant)>,
74 dev: Option<u16>,
75 shutdown_tx: Mutex<Option<oneshot::Sender<()>>>,
76}
77
78impl<N: Network> BootstrapClient<N> {
79 const COMMITTEE_REFRESH_TIME: Duration = Duration::from_secs(20);
81 const CONNECTION_LIFETIME: Duration = Duration::from_secs(15);
83 const MAX_PEERS: u16 = 1_000;
85
86 pub async fn new(
87 listener_addr: SocketAddr,
88 account: Account<N>,
89 genesis_header: Header<N>,
90 dev: Option<u16>,
91 ) -> anyhow::Result<Self> {
92 let tcp = Tcp::new(tcp::Config::new(listener_addr, Self::MAX_PEERS));
94 let peer_pool = Default::default();
96 let known_validators = Default::default();
98 let restrictions_id = Restrictions::load()?.restrictions_id();
100 let resolver = Default::default();
102 let http_client = reqwest::Client::new();
104 let latest_committee = TMutex::new((Default::default(), Instant::now() - Self::COMMITTEE_REFRESH_TIME));
106
107 let (shutdown_tx, shutdown_rx) = oneshot::channel();
109 let shutdown_tx = Mutex::new(Some(shutdown_tx));
110
111 let inner = InnerBootstrapClient {
113 tcp,
114 peer_pool,
115 known_validators,
116 resolver,
117 account,
118 genesis_header,
119 restrictions_id,
120 http_client,
121 latest_committee,
122 dev,
123 shutdown_tx,
124 };
125 let node = BootstrapClient(Arc::new(inner));
126
127 node.enable_handshake().await;
129 node.enable_reading().await;
130 node.enable_writing().await;
131 node.enable_disconnect().await;
132 node.enable_on_connect().await;
133 node.tcp().enable_listener().await.expect("Failed to enable the TCP listener");
135
136 let _ = shutdown_rx.await;
138
139 Ok(node)
140 }
141
142 pub fn address(&self) -> Address<N> {
144 self.account.address()
145 }
146
147 pub fn private_key(&self) -> &PrivateKey<N> {
149 self.account.private_key()
150 }
151
152 pub fn view_key(&self) -> &ViewKey<N> {
154 self.account.view_key()
155 }
156
157 pub fn resolve_to_listener(&self, connected_addr: SocketAddr) -> Option<SocketAddr> {
159 self.resolver.read().get_listener(connected_addr)
160 }
161
162 pub fn is_dev(&self) -> bool {
164 self.dev.is_some()
165 }
166
167 pub async fn get_or_update_committee(&self) -> anyhow::Result<Option<HashSet<Address<N>>>> {
170 if cfg!(feature = "test") || self.is_dev() {
172 match std::env::var("TEST_COMMITTEE_ADDRS") {
173 Ok(aleo_addrs) => {
174 let dev_committee =
175 aleo_addrs.split(',').map(|addr| Address::<N>::from_str(addr).unwrap()).collect();
176 return Ok(Some(dev_committee));
177 }
178 Err(err) => {
179 warn!("Failed to load committee peers from environment: {err}");
180 return Ok(None);
181 }
182 }
183 }
184
185 let now = Instant::now();
186 let (committee, timestamp) = &mut *self.latest_committee.lock().await;
187 if now - *timestamp >= Self::COMMITTEE_REFRESH_TIME {
188 debug!("Updating the validator committee");
189 *timestamp = now;
190 let committe_query_addr =
191 format!("https://api.explorer.provable.com/v2/{}/committee/latest", N::SHORT_NAME);
192 let response = self.http_client.get(committe_query_addr).send().await?;
193 debug!("Received response from the explorer: {:?}", response);
194 let json = response.text().await?;
195 let full_committee = Committee::from_str(&json)?;
196 *committee = full_committee.members().keys().copied().collect();
197 debug!("The validator committee has {} members now", committee.len());
198
199 Ok(Some(committee.clone()))
200 } else {
201 Ok(Some(committee.clone()))
202 }
203 }
204
205 pub async fn get_validator_addrs(&self) -> HashMap<SocketAddr, KnownValidatorInfo<N>> {
208 let mut known_validators = self.known_validators.read().clone();
210 match self.get_or_update_committee().await {
212 Ok(Some(committee)) => {
213 known_validators.retain(|_, (aleo_addr, _)| committee.contains(aleo_addr));
214 known_validators
215 }
216 Ok(None) => known_validators,
217 Err(error) => {
218 error!("Couldn't update the validator committee: {error}");
219 known_validators
220 }
221 }
222 }
223
224 pub async fn shut_down(&self) {
226 info!("Shutting down the bootstrap client...");
227
228 self.tcp.shut_down().await;
230
231 if let Some(shutdown_tx) = self.shutdown_tx.lock().take() {
233 let _ = shutdown_tx.send(());
234 }
235 }
236
237 pub async fn wait_for_signals(&self, handler: &SignalHandler) {
239 handler.wait_for_signals().await;
240
241 self.shut_down().await;
243 }
244}