snarkos_node/bootstrap_client/
mod.rs1mod codec;
17mod handshake;
18mod network;
19
20use crate::tcp::{self, Tcp};
21use snarkos_account::Account;
22use snarkos_node_network::{ConnectionMode, Peer, Resolver};
23use snarkos_node_tcp::{P2P, protocols::*};
24use snarkvm::{
25 ledger::committee::Committee,
26 prelude::{Address, Field, Header, Network, PrivateKey, ViewKey},
27 synthesizer::Restrictions,
28};
29
30#[cfg(feature = "locktick")]
31use locktick::{
32 parking_lot::{Mutex, RwLock},
33 tokio::Mutex as TMutex,
34};
35#[cfg(not(feature = "locktick"))]
36use parking_lot::{Mutex, RwLock};
37use std::{
38 collections::{HashMap, HashSet},
39 net::SocketAddr,
40 ops::Deref,
41 str::FromStr,
42 sync::Arc,
43 time::{Duration, Instant},
44};
45#[cfg(not(feature = "locktick"))]
46use tokio::sync::Mutex as TMutex;
47use tokio::sync::oneshot;
48
49#[derive(Clone)]
50pub struct BootstrapClient<N: Network>(Arc<InnerBootstrapClient<N>>);
51
52impl<N: Network> Deref for BootstrapClient<N> {
53 type Target = Arc<InnerBootstrapClient<N>>;
54
55 fn deref(&self) -> &Self::Target {
56 &self.0
57 }
58}
59
60type KnownValidatorInfo<N> = (Address<N>, ConnectionMode);
62
63pub struct InnerBootstrapClient<N: Network> {
64 tcp: Tcp,
65 peer_pool: RwLock<HashMap<SocketAddr, Peer<N>>>,
66 known_validators: RwLock<HashMap<SocketAddr, KnownValidatorInfo<N>>>,
67 resolver: RwLock<Resolver<N>>,
68 account: Account<N>,
69 genesis_header: Header<N>,
70 restrictions_id: Field<N>,
71 http_client: reqwest::Client,
72 latest_committee: TMutex<(HashSet<Address<N>>, Instant)>,
73 dev: Option<u16>,
74 shutdown_tx: Mutex<Option<oneshot::Sender<()>>>,
75}
76
77impl<N: Network> BootstrapClient<N> {
78 const COMMITTEE_REFRESH_TIME: Duration = Duration::from_secs(20);
80 const CONNECTION_LIFETIME: Duration = Duration::from_secs(15);
82 const MAX_PEERS: u16 = 1_000;
84
85 pub async fn new(
86 listener_addr: SocketAddr,
87 account: Account<N>,
88 genesis_header: Header<N>,
89 dev: Option<u16>,
90 ) -> anyhow::Result<Self> {
91 let tcp = Tcp::new(tcp::Config::new(listener_addr, Self::MAX_PEERS));
93 let peer_pool = Default::default();
95 let known_validators = Default::default();
97 let restrictions_id = Restrictions::load()?.restrictions_id();
99 let resolver = Default::default();
101 let http_client = reqwest::Client::new();
103 let latest_committee = TMutex::new((Default::default(), Instant::now() - Self::COMMITTEE_REFRESH_TIME));
105
106 let (shutdown_tx, shutdown_rx) = oneshot::channel();
108 let shutdown_tx = Mutex::new(Some(shutdown_tx));
109
110 let inner = InnerBootstrapClient {
112 tcp,
113 peer_pool,
114 known_validators,
115 resolver,
116 account,
117 genesis_header,
118 restrictions_id,
119 http_client,
120 latest_committee,
121 dev,
122 shutdown_tx,
123 };
124 let node = BootstrapClient(Arc::new(inner));
125
126 node.enable_handshake().await;
128 node.enable_reading().await;
129 node.enable_writing().await;
130 node.enable_disconnect().await;
131 node.enable_on_connect().await;
132 node.tcp().enable_listener().await.expect("Failed to enable the TCP listener");
134
135 let _ = shutdown_rx.await;
137
138 Ok(node)
139 }
140
141 pub fn address(&self) -> Address<N> {
143 self.account.address()
144 }
145
146 pub fn private_key(&self) -> &PrivateKey<N> {
148 self.account.private_key()
149 }
150
151 pub fn view_key(&self) -> &ViewKey<N> {
153 self.account.view_key()
154 }
155
156 pub fn resolve_to_listener(&self, connected_addr: SocketAddr) -> Option<SocketAddr> {
158 self.resolver.read().get_listener(connected_addr)
159 }
160
161 pub fn is_dev(&self) -> bool {
163 self.dev.is_some()
164 }
165
166 pub async fn get_or_update_committee(&self) -> anyhow::Result<Option<HashSet<Address<N>>>> {
169 if cfg!(feature = "test") || self.is_dev() {
171 match std::env::var("TEST_COMMITTEE_ADDRS") {
172 Ok(aleo_addrs) => {
173 let dev_committee =
174 aleo_addrs.split(',').map(|addr| Address::<N>::from_str(addr).unwrap()).collect();
175 return Ok(Some(dev_committee));
176 }
177 Err(err) => {
178 warn!("Failed to load committee peers from environment: {err}");
179 return Ok(None);
180 }
181 }
182 }
183
184 let now = Instant::now();
185 let (committee, timestamp) = &mut *self.latest_committee.lock().await;
186 if now - *timestamp >= Self::COMMITTEE_REFRESH_TIME {
187 debug!("Updating the validator committee");
188 *timestamp = now;
189 let committe_query_addr = format!("https://api.explorer.provable.com/v2/{}/committee/latest", N::NAME);
190 let response = self.http_client.get(committe_query_addr).send().await?;
191 let json = response.text().await?;
192 let full_committee = Committee::from_str(&json)?;
193 *committee = full_committee.members().keys().copied().collect();
194 debug!("The validator committee has {} members now", committee.len());
195
196 Ok(Some(committee.clone()))
197 } else {
198 Ok(Some(committee.clone()))
199 }
200 }
201
202 pub async fn get_validator_addrs(&self) -> HashMap<SocketAddr, KnownValidatorInfo<N>> {
205 let mut known_validators = self.known_validators.read().clone();
207 match self.get_or_update_committee().await {
209 Ok(Some(committee)) => {
210 known_validators.retain(|_, (aleo_addr, _)| committee.contains(aleo_addr));
211 known_validators
212 }
213 Ok(None) => known_validators,
214 Err(error) => {
215 error!("Couldn't update the validator committee: {error}");
216 known_validators
217 }
218 }
219 }
220
221 pub async fn shut_down(&self) {
223 info!("Shutting down the bootstrap client...");
224
225 self.tcp.shut_down().await;
227
228 if let Some(shutdown_tx) = self.shutdown_tx.lock().take() {
230 let _ = shutdown_tx.send(());
231 }
232 }
233}