snarkos_node/bootstrap_client/
mod.rs1mod codec;
17mod handshake;
18mod network;
19
20use crate::{
21 router::{Peer, Resolver},
22 tcp::{self, Tcp},
23};
24use snarkos_account::Account;
25use snarkos_node_tcp::{P2P, protocols::*};
26use snarkvm::{
27 ledger::committee::Committee,
28 prelude::{Address, Field, Header, Network, PrivateKey, ViewKey},
29 synthesizer::Restrictions,
30};
31
32#[cfg(feature = "locktick")]
33use locktick::{
34 parking_lot::{Mutex, RwLock},
35 tokio::Mutex as TMutex,
36};
37#[cfg(not(feature = "locktick"))]
38use parking_lot::{Mutex, RwLock};
39use std::{
40 collections::{HashMap, HashSet},
41 net::SocketAddr,
42 ops::Deref,
43 str::FromStr,
44 sync::Arc,
45 time::{Duration, Instant},
46};
47#[cfg(not(feature = "locktick"))]
48use tokio::sync::Mutex as TMutex;
49use tokio::sync::oneshot;
50
51#[derive(Clone)]
52pub struct BootstrapClient<N: Network>(Arc<InnerBootstrapClient<N>>);
53
54impl<N: Network> Deref for BootstrapClient<N> {
55 type Target = Arc<InnerBootstrapClient<N>>;
56
57 fn deref(&self) -> &Self::Target {
58 &self.0
59 }
60}
61
62pub struct InnerBootstrapClient<N: Network> {
63 tcp: Tcp,
64 peer_pool: RwLock<HashMap<SocketAddr, Peer<N>>>,
65 known_validators: RwLock<HashMap<SocketAddr, Address<N>>>,
66 resolver: RwLock<Resolver<N>>,
67 account: Account<N>,
68 genesis_header: Header<N>,
69 restrictions_id: Field<N>,
70 http_client: reqwest::Client,
71 latest_committee: TMutex<(HashSet<Address<N>>, Instant)>,
72 dev: Option<u16>,
73 shutdown_tx: Mutex<Option<oneshot::Sender<()>>>,
74}
75
76impl<N: Network> BootstrapClient<N> {
77 const COMMITTEE_REFRESH_TIME: Duration = Duration::from_secs(20);
79 const CONNECTION_LIFETIME: Duration = Duration::from_secs(15);
81 const MAX_PEERS: u16 = 1_000;
83
84 pub async fn new(
85 listener_addr: SocketAddr,
86 account: Account<N>,
87 genesis_header: Header<N>,
88 dev: Option<u16>,
89 ) -> anyhow::Result<Self> {
90 let tcp = Tcp::new(tcp::Config::new(listener_addr, Self::MAX_PEERS));
92 let peer_pool = Default::default();
94 let known_validators = Default::default();
96 let restrictions_id = Restrictions::load()?.restrictions_id();
98 let resolver = Default::default();
100 let http_client = reqwest::Client::new();
102 let latest_committee = TMutex::new((Default::default(), Instant::now() - Self::COMMITTEE_REFRESH_TIME));
104
105 let (shutdown_tx, shutdown_rx) = oneshot::channel();
107 let shutdown_tx = Mutex::new(Some(shutdown_tx));
108
109 let inner = InnerBootstrapClient {
111 tcp,
112 peer_pool,
113 known_validators,
114 resolver,
115 account,
116 genesis_header,
117 restrictions_id,
118 http_client,
119 latest_committee,
120 dev,
121 shutdown_tx,
122 };
123 let node = BootstrapClient(Arc::new(inner));
124
125 node.enable_handshake().await;
127 node.enable_reading().await;
128 node.enable_writing().await;
129 node.enable_disconnect().await;
130 node.enable_on_connect().await;
131 node.tcp().enable_listener().await.expect("Failed to enable the TCP listener");
133
134 let _ = shutdown_rx.await;
136
137 Ok(node)
138 }
139
140 pub fn address(&self) -> Address<N> {
142 self.account.address()
143 }
144
145 pub fn private_key(&self) -> &PrivateKey<N> {
147 self.account.private_key()
148 }
149
150 pub fn view_key(&self) -> &ViewKey<N> {
152 self.account.view_key()
153 }
154
155 pub fn resolve_to_listener(&self, connected_addr: SocketAddr) -> Option<SocketAddr> {
157 self.resolver.read().get_listener(connected_addr)
158 }
159
160 pub fn is_dev(&self) -> bool {
162 self.dev.is_some()
163 }
164
165 pub async fn get_or_update_committee(&self) -> anyhow::Result<Option<HashSet<Address<N>>>> {
168 if self.is_dev() {
170 return Ok(None);
171 }
172
173 let now = Instant::now();
174 let (committee, timestamp) = &mut *self.latest_committee.lock().await;
175 if now - *timestamp >= Self::COMMITTEE_REFRESH_TIME {
176 debug!("Updating the validator committee");
177 *timestamp = now;
178 let committe_query_addr = format!("https://api.explorer.provable.com/v2/{}/committee/latest", N::NAME);
179 let response = self.http_client.get(committe_query_addr).send().await?;
180 let json = response.text().await?;
181 let full_committee = Committee::from_str(&json)?;
182 *committee = full_committee.members().keys().copied().collect();
183 debug!("The validator committee has {} members now", committee.len());
184
185 Ok(Some(committee.clone()))
186 } else {
187 Ok(Some(committee.clone()))
188 }
189 }
190
191 pub fn get_known_validators(&self) -> HashMap<SocketAddr, Address<N>> {
193 self.known_validators.read().clone()
194 }
195
196 pub async fn shut_down(&self) {
198 info!("Shutting down the bootstrap client...");
199
200 self.tcp.shut_down().await;
202
203 if let Some(shutdown_tx) = self.shutdown_tx.lock().take() {
205 let _ = shutdown_tx.send(());
206 }
207 }
208}