snarkos_node/bootstrap_client/
mod.rs1mod codec;
17mod handshake;
18mod network;
19
20use crate::tcp::{self, Tcp};
21use snarkos_account::Account;
22use snarkos_node_network::{ConnectionMode, Peer, Resolver};
23use snarkos_node_tcp::{P2P, protocols::*};
24use snarkos_utilities::SignalHandler;
25use snarkvm::{
26 ledger::committee::Committee,
27 prelude::{Address, Field, Header, Network, PrivateKey, ViewKey},
28 synthesizer::Restrictions,
29};
30
31#[cfg(feature = "locktick")]
32use locktick::{parking_lot::RwLock, tokio::Mutex as TMutex};
33#[cfg(not(feature = "locktick"))]
34use parking_lot::RwLock;
35use std::{
36 collections::{HashMap, HashSet},
37 net::SocketAddr,
38 ops::Deref,
39 str::FromStr,
40 sync::Arc,
41 time::{Duration, Instant},
42};
43#[cfg(not(feature = "locktick"))]
44use tokio::sync::Mutex as TMutex;
45
46#[derive(Clone)]
47pub struct BootstrapClient<N: Network>(Arc<InnerBootstrapClient<N>>);
48
49impl<N: Network> Deref for BootstrapClient<N> {
50 type Target = Arc<InnerBootstrapClient<N>>;
51
52 fn deref(&self) -> &Self::Target {
53 &self.0
54 }
55}
56
57type KnownValidatorInfo<N> = (Address<N>, ConnectionMode);
59
60pub struct InnerBootstrapClient<N: Network> {
61 tcp: Tcp,
62 peer_pool: RwLock<HashMap<SocketAddr, Peer<N>>>,
63 known_validators: RwLock<HashMap<SocketAddr, KnownValidatorInfo<N>>>,
64 resolver: RwLock<Resolver<N>>,
65 account: Account<N>,
66 genesis_header: Header<N>,
67 restrictions_id: Field<N>,
68 http_client: reqwest::Client,
69 latest_committee: TMutex<(HashSet<Address<N>>, Instant)>,
70 dev: Option<u16>,
71}
72
73impl<N: Network> BootstrapClient<N> {
74 const COMMITTEE_REFRESH_TIME: Duration = Duration::from_secs(20);
76 const CONNECTION_LIFETIME: Duration = Duration::from_secs(15);
78 const MAX_PEERS: u16 = 1_000;
80
81 pub async fn new(
82 listener_addr: SocketAddr,
83 account: Account<N>,
84 genesis_header: Header<N>,
85 dev: Option<u16>,
86 ) -> anyhow::Result<Self> {
87 let tcp = Tcp::new(tcp::Config::new(listener_addr, Self::MAX_PEERS));
89 let peer_pool = Default::default();
91 let known_validators = Default::default();
93 let restrictions_id = Restrictions::load()?.restrictions_id();
95 let resolver = Default::default();
97 let http_client = reqwest::Client::new();
99 let latest_committee = TMutex::new((Default::default(), Instant::now() - Self::COMMITTEE_REFRESH_TIME));
101
102 let inner = InnerBootstrapClient {
104 tcp,
105 peer_pool,
106 known_validators,
107 resolver,
108 account,
109 genesis_header,
110 restrictions_id,
111 http_client,
112 latest_committee,
113 dev,
114 };
115 let node = BootstrapClient(Arc::new(inner));
116
117 node.enable_handshake().await;
119 node.enable_reading().await;
120 node.enable_writing().await;
121 node.enable_disconnect().await;
122 node.enable_on_connect().await;
123 node.tcp().enable_listener().await.expect("Failed to enable the TCP listener");
125
126 Ok(node)
127 }
128
129 pub fn address(&self) -> Address<N> {
131 self.account.address()
132 }
133
134 pub fn private_key(&self) -> &PrivateKey<N> {
136 self.account.private_key()
137 }
138
139 pub fn view_key(&self) -> &ViewKey<N> {
141 self.account.view_key()
142 }
143
144 pub fn resolve_to_listener(&self, connected_addr: SocketAddr) -> Option<SocketAddr> {
146 self.resolver.read().get_listener(connected_addr)
147 }
148
149 pub fn is_dev(&self) -> bool {
151 self.dev.is_some()
152 }
153
154 pub async fn get_or_update_committee(&self) -> anyhow::Result<Option<HashSet<Address<N>>>> {
157 if cfg!(feature = "test") || self.is_dev() {
159 match std::env::var("TEST_COMMITTEE_ADDRS") {
160 Ok(aleo_addrs) => {
161 let dev_committee =
162 aleo_addrs.split(',').map(|addr| Address::<N>::from_str(addr).unwrap()).collect();
163 return Ok(Some(dev_committee));
164 }
165 Err(err) => {
166 warn!("Failed to load committee peers from environment: {err}");
167 return Ok(None);
168 }
169 }
170 }
171
172 let now = Instant::now();
173 let (committee, timestamp) = &mut *self.latest_committee.lock().await;
174 if now - *timestamp >= Self::COMMITTEE_REFRESH_TIME {
175 debug!("Updating the validator committee");
176 *timestamp = now;
177 let committe_query_addr =
178 format!("https://api.explorer.provable.com/v2/{}/committee/latest", N::SHORT_NAME);
179 let response = self.http_client.get(committe_query_addr).send().await?;
180 debug!("Received response from the explorer: {:?}", response);
181 let json = response.text().await?;
182 let full_committee = Committee::from_str(&json)?;
183 *committee = full_committee.members().keys().copied().collect();
184 debug!("The validator committee has {} members now", committee.len());
185
186 Ok(Some(committee.clone()))
187 } else {
188 Ok(Some(committee.clone()))
189 }
190 }
191
192 pub async fn get_validator_addrs(&self) -> HashMap<SocketAddr, KnownValidatorInfo<N>> {
195 let mut known_validators = self.known_validators.read().clone();
197 match self.get_or_update_committee().await {
199 Ok(Some(committee)) => {
200 known_validators.retain(|_, (aleo_addr, _)| committee.contains(aleo_addr));
201 known_validators
202 }
203 Ok(None) => known_validators,
204 Err(error) => {
205 error!("Couldn't update the validator committee: {error}");
206 known_validators
207 }
208 }
209 }
210
211 pub async fn shut_down(&self) {
213 info!("Shutting down the bootstrap client...");
214
215 self.tcp.shut_down().await;
217 }
218
219 pub async fn wait_for_signals(&self, handler: &SignalHandler) {
221 handler.wait_for_signals().await;
222
223 self.shut_down().await;
225 }
226}