1mod router;
17
18use crate::traits::NodeInterface;
19use snarkos_account::Account;
20use snarkos_node_bft::ledger_service::CoreLedgerService;
21use snarkos_node_rest::Rest;
22use snarkos_node_router::{
23 Heartbeat,
24 Inbound,
25 Outbound,
26 Router,
27 Routing,
28 messages::{Message, NodeType, UnconfirmedSolution, UnconfirmedTransaction},
29};
30use snarkos_node_sync::{BlockSync, BlockSyncMode};
31use snarkos_node_tcp::{
32 P2P,
33 protocols::{Disconnect, Handshake, OnConnect, Reading, Writing},
34};
35use snarkvm::{
36 console::network::Network,
37 ledger::{
38 Ledger,
39 block::{Block, Header},
40 puzzle::{Puzzle, Solution, SolutionID},
41 store::ConsensusStorage,
42 },
43 prelude::block::Transaction,
44};
45
46use aleo_std::StorageMode;
47use anyhow::Result;
48use core::future::Future;
49#[cfg(feature = "locktick")]
50use locktick::parking_lot::Mutex;
51use lru::LruCache;
52#[cfg(not(feature = "locktick"))]
53use parking_lot::Mutex;
54use std::{
55 net::SocketAddr,
56 num::NonZeroUsize,
57 sync::{
58 Arc,
59 atomic::{
60 AtomicBool,
61 AtomicUsize,
62 Ordering::{Acquire, Relaxed},
63 },
64 },
65 time::Duration,
66};
67use tokio::{task::JoinHandle, time::sleep};
68
69const MAX_PARALLEL_DEPLOY_VERIFICATIONS: usize = 5;
72const MAX_PARALLEL_EXECUTE_VERIFICATIONS: usize = 1000;
75const MAX_PARALLEL_SOLUTION_VERIFICATIONS: usize = 20;
78const CAPACITY_FOR_DEPLOYMENTS: usize = 1 << 10;
81const CAPACITY_FOR_EXECUTIONS: usize = 1 << 10;
84const CAPACITY_FOR_SOLUTIONS: usize = 1 << 10;
87
88type TransactionContents<N> = (SocketAddr, UnconfirmedTransaction<N>, Transaction<N>);
91type SolutionContents<N> = (SocketAddr, UnconfirmedSolution<N>, Solution<N>);
94
95#[derive(Clone)]
97pub struct Client<N: Network, C: ConsensusStorage<N>> {
98 ledger: Ledger<N, C>,
100 router: Router<N>,
102 rest: Option<Rest<N, C, Self>>,
104 sync: Arc<BlockSync<N>>,
106 genesis: Block<N>,
108 puzzle: Puzzle<N>,
110 solution_queue: Arc<Mutex<LruCache<SolutionID<N>, SolutionContents<N>>>>,
112 deploy_queue: Arc<Mutex<LruCache<N::TransactionID, TransactionContents<N>>>>,
114 execute_queue: Arc<Mutex<LruCache<N::TransactionID, TransactionContents<N>>>>,
116 num_verifying_solutions: Arc<AtomicUsize>,
118 num_verifying_deploys: Arc<AtomicUsize>,
120 num_verifying_executions: Arc<AtomicUsize>,
122 handles: Arc<Mutex<Vec<JoinHandle<()>>>>,
124 shutdown: Arc<AtomicBool>,
126}
127
128impl<N: Network, C: ConsensusStorage<N>> Client<N, C> {
129 pub async fn new(
131 node_ip: SocketAddr,
132 rest_ip: Option<SocketAddr>,
133 rest_rps: u32,
134 account: Account<N>,
135 trusted_peers: &[SocketAddr],
136 genesis: Block<N>,
137 cdn: Option<String>,
138 storage_mode: StorageMode,
139 rotate_external_peers: bool,
140 shutdown: Arc<AtomicBool>,
141 ) -> Result<Self> {
142 let signal_node = Self::handle_signals(shutdown.clone());
144
145 let ledger = Ledger::<N, C>::load(genesis.clone(), storage_mode.clone())?;
147
148 if let Some(base_url) = cdn {
150 if let Err((_, error)) =
152 snarkos_node_cdn::sync_ledger_with_cdn(&base_url, ledger.clone(), shutdown.clone()).await
153 {
154 crate::log_clean_error(&storage_mode);
155 return Err(error);
156 }
157 }
158
159 let ledger_service = Arc::new(CoreLedgerService::<N, C>::new(ledger.clone(), shutdown.clone()));
161 let allow_external_peers = true;
163
164 let router = Router::new(
166 node_ip,
167 NodeType::Client,
168 account,
169 ledger_service.clone(),
170 trusted_peers,
171 Self::MAXIMUM_NUMBER_OF_PEERS as u16,
172 rotate_external_peers,
173 allow_external_peers,
174 matches!(storage_mode, StorageMode::Development(_)),
175 )
176 .await?;
177
178 let sync = BlockSync::new(BlockSyncMode::Router, ledger_service.clone(), router.tcp().clone());
180
181 let mut node = Self {
183 ledger: ledger.clone(),
184 router,
185 rest: None,
186 sync: Arc::new(sync),
187 genesis,
188 puzzle: ledger.puzzle().clone(),
189 solution_queue: Arc::new(Mutex::new(LruCache::new(NonZeroUsize::new(CAPACITY_FOR_SOLUTIONS).unwrap()))),
190 deploy_queue: Arc::new(Mutex::new(LruCache::new(NonZeroUsize::new(CAPACITY_FOR_DEPLOYMENTS).unwrap()))),
191 execute_queue: Arc::new(Mutex::new(LruCache::new(NonZeroUsize::new(CAPACITY_FOR_EXECUTIONS).unwrap()))),
192 num_verifying_solutions: Default::default(),
193 num_verifying_deploys: Default::default(),
194 num_verifying_executions: Default::default(),
195 handles: Default::default(),
196 shutdown,
197 };
198
199 if let Some(rest_ip) = rest_ip {
201 node.rest = Some(Rest::start(rest_ip, rest_rps, None, ledger.clone(), Arc::new(node.clone())).await?);
202 }
203 node.initialize_routing().await;
205 node.initialize_sync();
207 node.initialize_solution_verification();
209 node.initialize_deploy_verification();
211 node.initialize_execute_verification();
213 node.handles.lock().push(crate::start_notification_message_loop());
215 let _ = signal_node.set(node.clone());
217 Ok(node)
219 }
220
221 pub fn ledger(&self) -> &Ledger<N, C> {
223 &self.ledger
224 }
225
226 pub fn rest(&self) -> &Option<Rest<N, C, Self>> {
228 &self.rest
229 }
230}
231
232impl<N: Network, C: ConsensusStorage<N>> Client<N, C> {
233 fn initialize_sync(&self) {
235 let node = self.clone();
237 self.handles.lock().push(tokio::spawn(async move {
238 loop {
239 if node.shutdown.load(std::sync::atomic::Ordering::Acquire) {
241 info!("Shutting down block production");
242 break;
243 }
244
245 tokio::time::sleep(std::time::Duration::from_secs(5)).await;
247 node.sync.try_block_sync(&node).await;
249 }
250 }));
251 }
252
253 fn initialize_solution_verification(&self) {
255 let node = self.clone();
257 self.handles.lock().push(tokio::spawn(async move {
258 loop {
259 if node.shutdown.load(Acquire) {
261 info!("Shutting down solution verification");
262 break;
263 }
264
265 let queue_is_empty = node.solution_queue.lock().is_empty();
267 let counter_is_full = node.num_verifying_solutions.load(Acquire) >= MAX_PARALLEL_SOLUTION_VERIFICATIONS;
269
270 if queue_is_empty || counter_is_full {
272 sleep(Duration::from_millis(50)).await;
273 continue;
274 }
275
276 let mut solution_queue = node.solution_queue.lock();
278 while let Some((_, (peer_ip, serialized, solution))) = solution_queue.pop_lru() {
279 let previous_counter = node.num_verifying_solutions.fetch_add(1, Relaxed);
281 let _node = node.clone();
282 tokio::task::spawn_blocking(move || {
284 if let Ok(epoch_hash) = _node.ledger.latest_epoch_hash() {
286 let proof_target = _node.ledger.latest_block().header().proof_target();
288 let is_valid = _node.puzzle.check_solution(&solution, epoch_hash, proof_target);
290
291 match is_valid {
292 Ok(()) => {
294 let message = Message::UnconfirmedSolution(serialized);
295 _node.propagate(message, &[peer_ip]);
297 }
298 Err(error) => {
300 if _node.ledger.latest_height() % N::NUM_BLOCKS_PER_EPOCH > 10 {
301 debug!("Failed to verify the solution from peer_ip {peer_ip} - {error}")
302 }
303 }
304 }
305 } else {
306 warn!("Failed to retrieve the latest epoch hash.");
307 }
308 _node.num_verifying_solutions.fetch_sub(1, Relaxed);
310 });
311 if previous_counter + 1 >= MAX_PARALLEL_SOLUTION_VERIFICATIONS {
313 break;
314 }
315 }
316 }
317 }));
318 }
319
320 fn initialize_deploy_verification(&self) {
322 let node = self.clone();
324 self.handles.lock().push(tokio::spawn(async move {
325 loop {
326 if node.shutdown.load(Acquire) {
328 info!("Shutting down deployment verification");
329 break;
330 }
331
332 let queue_is_empty = node.deploy_queue.lock().is_empty();
334 let counter_is_full = node.num_verifying_deploys.load(Acquire) >= MAX_PARALLEL_DEPLOY_VERIFICATIONS;
336
337 if queue_is_empty || counter_is_full {
339 sleep(Duration::from_millis(50)).await;
340 continue;
341 }
342
343 while let Some((_, (peer_ip, serialized, transaction))) = node.deploy_queue.lock().pop_lru() {
345 let previous_counter = node.num_verifying_deploys.fetch_add(1, Relaxed);
347 let _node = node.clone();
348 tokio::task::spawn_blocking(move || {
350 match _node.ledger.check_transaction_basic(&transaction, None, &mut rand::thread_rng()) {
352 Ok(_) => {
353 _node.propagate(Message::UnconfirmedTransaction(serialized), &[peer_ip]);
355 }
356 Err(error) => {
357 debug!("Failed to verify the deployment from peer_ip {peer_ip} - {error}");
358 }
359 }
360 _node.num_verifying_deploys.fetch_sub(1, Relaxed);
362 });
363 if previous_counter + 1 >= MAX_PARALLEL_DEPLOY_VERIFICATIONS {
365 break;
366 }
367 }
368 }
369 }));
370 }
371
372 fn initialize_execute_verification(&self) {
374 let node = self.clone();
376 self.handles.lock().push(tokio::spawn(async move {
377 loop {
378 if node.shutdown.load(Acquire) {
380 info!("Shutting down execution verification");
381 break;
382 }
383
384 let queue_is_empty = node.execute_queue.lock().is_empty();
386 let counter_is_full = node.num_verifying_executions.load(Acquire) >= MAX_PARALLEL_EXECUTE_VERIFICATIONS;
388
389 if queue_is_empty || counter_is_full {
391 sleep(Duration::from_millis(50)).await;
392 continue;
393 }
394
395 while let Some((_, (peer_ip, serialized, transaction))) = node.execute_queue.lock().pop_lru() {
397 let previous_counter = node.num_verifying_executions.fetch_add(1, Relaxed);
399 let _node = node.clone();
400 tokio::task::spawn_blocking(move || {
402 match _node.ledger.check_transaction_basic(&transaction, None, &mut rand::thread_rng()) {
404 Ok(_) => {
405 _node.propagate(Message::UnconfirmedTransaction(serialized), &[peer_ip]);
407 }
408 Err(error) => {
409 debug!("Failed to verify the execution from peer_ip {peer_ip} - {error}");
410 }
411 }
412 _node.num_verifying_executions.fetch_sub(1, Relaxed);
414 });
415 if previous_counter + 1 >= MAX_PARALLEL_EXECUTE_VERIFICATIONS {
417 break;
418 }
419 }
420 }
421 }));
422 }
423
424 pub fn spawn<T: Future<Output = ()> + Send + 'static>(&self, future: T) {
426 self.handles.lock().push(tokio::spawn(future));
427 }
428}
429
430#[async_trait]
431impl<N: Network, C: ConsensusStorage<N>> NodeInterface<N> for Client<N, C> {
432 async fn shut_down(&self) {
434 info!("Shutting down...");
435
436 trace!("Shutting down the node...");
438 self.shutdown.store(true, std::sync::atomic::Ordering::Release);
439
440 trace!("Shutting down the validator...");
442 self.handles.lock().iter().for_each(|handle| handle.abort());
443
444 self.router.shut_down().await;
446
447 info!("Node has shut down.");
448 }
449}