1mod router;
17
18use crate::traits::NodeInterface;
19
20use snarkos_account::Account;
21use snarkos_node_bft::{events::DataBlocks, helpers::fmt_id, ledger_service::CoreLedgerService};
22use snarkos_node_cdn::CdnBlockSync;
23use snarkos_node_network::NodeType;
24use snarkos_node_rest::Rest;
25use snarkos_node_router::{
26 Heartbeat,
27 Inbound,
28 Outbound,
29 Router,
30 Routing,
31 messages::{Message, UnconfirmedSolution, UnconfirmedTransaction},
32};
33use snarkos_node_sync::{BLOCK_REQUEST_BATCH_DELAY, BlockSync, Ping, PrepareSyncRequest, locators::BlockLocators};
34use snarkos_node_tcp::{
35 P2P,
36 protocols::{Disconnect, Handshake, OnConnect, Reading},
37};
38use snarkvm::{
39 console::network::Network,
40 ledger::{
41 Ledger,
42 block::{Block, Header},
43 puzzle::{Puzzle, Solution, SolutionID},
44 store::ConsensusStorage,
45 },
46 prelude::{VM, block::Transaction},
47 utilities::log_error,
48};
49
50use aleo_std::StorageMode;
51use anyhow::Result;
52use core::future::Future;
53use indexmap::IndexMap;
54#[cfg(feature = "locktick")]
55use locktick::parking_lot::Mutex;
56use lru::LruCache;
57#[cfg(not(feature = "locktick"))]
58use parking_lot::Mutex;
59use std::{
60 net::SocketAddr,
61 num::NonZeroUsize,
62 sync::{
63 Arc,
64 atomic::{
65 AtomicBool,
66 AtomicUsize,
67 Ordering::{Acquire, Relaxed},
68 },
69 },
70 time::Duration,
71};
72use tokio::{
73 task::JoinHandle,
74 time::{sleep, timeout},
75};
76
77const MAX_PARALLEL_SOLUTION_VERIFICATIONS: usize = 20;
80const CAPACITY_FOR_DEPLOYMENTS: usize = 1 << 10;
83const CAPACITY_FOR_EXECUTIONS: usize = 1 << 10;
86const CAPACITY_FOR_SOLUTIONS: usize = 1 << 10;
89
90type TransactionContents<N> = (SocketAddr, UnconfirmedTransaction<N>, Transaction<N>);
93type SolutionContents<N> = (SocketAddr, UnconfirmedSolution<N>, Solution<N>);
96
97#[derive(Clone)]
99pub struct Client<N: Network, C: ConsensusStorage<N>> {
100 ledger: Ledger<N, C>,
102 router: Router<N>,
104 rest: Option<Rest<N, C, Self>>,
106 sync: Arc<BlockSync<N>>,
108 genesis: Block<N>,
110 puzzle: Puzzle<N>,
112 solution_queue: Arc<Mutex<LruCache<SolutionID<N>, SolutionContents<N>>>>,
114 deploy_queue: Arc<Mutex<LruCache<N::TransactionID, TransactionContents<N>>>>,
116 execute_queue: Arc<Mutex<LruCache<N::TransactionID, TransactionContents<N>>>>,
118 num_verifying_solutions: Arc<AtomicUsize>,
120 num_verifying_deploys: Arc<AtomicUsize>,
122 num_verifying_executions: Arc<AtomicUsize>,
124 handles: Arc<Mutex<Vec<JoinHandle<()>>>>,
126 shutdown: Arc<AtomicBool>,
128 ping: Arc<Ping<N>>,
130}
131
132impl<N: Network, C: ConsensusStorage<N>> Client<N, C> {
133 pub async fn new(
135 node_ip: SocketAddr,
136 rest_ip: Option<SocketAddr>,
137 rest_rps: u32,
138 account: Account<N>,
139 trusted_peers: &[SocketAddr],
140 genesis: Block<N>,
141 cdn: Option<http::Uri>,
142 storage_mode: StorageMode,
143 trusted_peers_only: bool,
144 dev: Option<u16>,
145 shutdown: Arc<AtomicBool>,
146 ) -> Result<Self> {
147 let signal_node = Self::handle_signals(shutdown.clone());
149
150 let ledger = Ledger::<N, C>::load(genesis.clone(), storage_mode.clone())?;
152
153 let ledger_service = Arc::new(CoreLedgerService::<N, C>::new(ledger.clone(), shutdown.clone()));
155
156 let router = Router::new(
158 node_ip,
159 NodeType::Client,
160 account,
161 ledger_service.clone(),
162 trusted_peers,
163 Self::MAXIMUM_NUMBER_OF_PEERS as u16,
164 trusted_peers_only,
165 storage_mode.clone(),
166 dev.is_some(),
167 )
168 .await?;
169
170 let sync = Arc::new(BlockSync::new(ledger_service.clone()));
172
173 let locators = sync.get_block_locators()?;
175 let ping = Arc::new(Ping::new(router.clone(), locators));
176
177 let mut node = Self {
179 ledger: ledger.clone(),
180 router,
181 rest: None,
182 sync: sync.clone(),
183 genesis,
184 ping,
185 puzzle: ledger.puzzle().clone(),
186 solution_queue: Arc::new(Mutex::new(LruCache::new(NonZeroUsize::new(CAPACITY_FOR_SOLUTIONS).unwrap()))),
187 deploy_queue: Arc::new(Mutex::new(LruCache::new(NonZeroUsize::new(CAPACITY_FOR_DEPLOYMENTS).unwrap()))),
188 execute_queue: Arc::new(Mutex::new(LruCache::new(NonZeroUsize::new(CAPACITY_FOR_EXECUTIONS).unwrap()))),
189 num_verifying_solutions: Default::default(),
190 num_verifying_deploys: Default::default(),
191 num_verifying_executions: Default::default(),
192 handles: Default::default(),
193 shutdown: shutdown.clone(),
194 };
195
196 let cdn_sync = cdn.map(|base_url| {
198 trace!("CDN sync is enabled");
199 Arc::new(CdnBlockSync::new(base_url, ledger.clone(), shutdown))
200 });
201
202 if let Some(rest_ip) = rest_ip {
204 node.rest = Some(
205 Rest::start(rest_ip, rest_rps, None, ledger.clone(), Arc::new(node.clone()), cdn_sync.clone(), sync)
206 .await?,
207 );
208 }
209
210 if let Some(cdn_sync) = cdn_sync {
212 if let Err(error) = cdn_sync.wait().await {
213 crate::log_clean_error(&storage_mode);
214 node.shut_down().await;
215 return Err(error);
216 }
217 }
218
219 node.initialize_routing().await;
221 node.initialize_sync();
223 node.initialize_solution_verification();
225 node.initialize_deploy_verification();
227 node.initialize_execute_verification();
229 node.handles.lock().push(crate::start_notification_message_loop());
231 let _ = signal_node.set(node.clone());
233 Ok(node)
235 }
236
237 pub fn ledger(&self) -> &Ledger<N, C> {
239 &self.ledger
240 }
241
242 pub fn rest(&self) -> &Option<Rest<N, C, Self>> {
244 &self.rest
245 }
246
247 pub fn router(&self) -> &Router<N> {
249 &self.router
250 }
251}
252
253impl<N: Network, C: ConsensusStorage<N>> Client<N, C> {
255 const MAX_SYNC_INTERVAL: Duration = Duration::from_secs(30);
258
259 fn initialize_sync(&self) {
261 let self_ = self.clone();
263 self.spawn(async move {
264 while !self_.shutdown.load(std::sync::atomic::Ordering::Acquire) {
265 self_.try_issuing_block_requests().await;
267
268 }
270
271 info!("Stopped block request generation");
272 });
273
274 let self_ = self.clone();
276 self.spawn(async move {
277 while !self_.shutdown.load(std::sync::atomic::Ordering::Acquire) {
278 let _ = timeout(Self::MAX_SYNC_INTERVAL, self_.sync.wait_for_block_responses()).await;
280
281 self_.try_advancing_block_synchronization().await;
283
284 }
287
288 debug!("Stopped block response processing");
289 });
290 }
291
292 async fn try_advancing_block_synchronization(&self) {
294 let has_new_blocks = match self.sync.try_advancing_block_synchronization().await {
295 Ok(val) => val,
296 Err(err) => {
297 error!("Block synchronization failed - {err}");
298 return;
299 }
300 };
301
302 if has_new_blocks {
304 match self.sync.get_block_locators() {
305 Ok(locators) => self.ping.update_block_locators(locators),
306 Err(err) => error!("Failed to get block locators: {err}"),
307 }
308 }
309 }
310
311 async fn try_issuing_block_requests(&self) {
313 let _ = timeout(Self::MAX_SYNC_INTERVAL, self.sync.wait_for_peer_update()).await;
315
316 self.sync.set_sync_height(self.ledger.latest_height());
319
320 match self.sync.handle_block_request_timeouts(&self.router) {
321 Ok(Some((requests, sync_peers))) => {
322 self.send_block_requests(requests, sync_peers).await;
324 return;
325 }
326 Ok(None) => {}
327 Err(err) => {
328 log_error(&err);
330 return;
331 }
332 }
333
334 if !self.sync.can_block_sync() {
337 trace!("Nothing to sync. Will not issue new block requests");
338 return;
339 }
340
341 let (block_requests, sync_peers) = self.sync.prepare_block_requests();
344
345 if block_requests.is_empty() {
348 let total_requests = self.sync.num_total_block_requests();
349 let num_outstanding = self.sync.num_outstanding_block_requests();
350 if total_requests > 0 {
351 trace!(
352 "Not block synced yet, but there are still {total_requests} in-flight requests. {num_outstanding} are still awaiting responses."
353 );
354 } else {
355 debug!(
357 "Not block synced yet, and there are no outstanding block requests or \
358 new block requests to send"
359 );
360 }
361 } else {
362 self.send_block_requests(block_requests, sync_peers).await;
363 }
364 }
365
366 async fn send_block_requests(
367 &self,
368 block_requests: Vec<(u32, PrepareSyncRequest<N>)>,
369 sync_peers: IndexMap<SocketAddr, BlockLocators<N>>,
370 ) {
371 for requests in block_requests.chunks(DataBlocks::<N>::MAXIMUM_NUMBER_OF_BLOCKS as usize) {
373 if !self.sync.send_block_requests(self.router(), &sync_peers, requests).await {
374 break;
376 }
377
378 tokio::time::sleep(BLOCK_REQUEST_BATCH_DELAY).await;
380 }
381 }
382
383 fn initialize_solution_verification(&self) {
385 let node = self.clone();
387 self.spawn(async move {
388 loop {
389 if node.shutdown.load(Acquire) {
391 info!("Shutting down solution verification");
392 break;
393 }
394
395 let queue_is_empty = node.solution_queue.lock().is_empty();
397 let counter_is_full = node.num_verifying_solutions.load(Acquire) >= MAX_PARALLEL_SOLUTION_VERIFICATIONS;
399
400 if queue_is_empty || counter_is_full {
402 sleep(Duration::from_millis(50)).await;
403 continue;
404 }
405
406 let mut solution_queue = node.solution_queue.lock();
408 while let Some((_, (peer_ip, serialized, solution))) = solution_queue.pop_lru() {
409 let previous_counter = node.num_verifying_solutions.fetch_add(1, Relaxed);
411 let _node = node.clone();
412 tokio::task::spawn_blocking(move || {
414 if let Ok(epoch_hash) = _node.ledger.latest_epoch_hash() {
416 let prover_address = solution.address();
420 if _node.ledger.is_solution_limit_reached(&prover_address, 0) {
421 debug!("Invalid Solution '{}' - Prover '{prover_address}' has reached their solution limit for the current epoch", fmt_id(solution.id()));
422 }
423 let proof_target = _node.ledger.latest_block().header().proof_target();
425 let is_valid = _node.puzzle.check_solution(&solution, epoch_hash, proof_target);
427
428 match is_valid {
429 Ok(()) => {
431 let message = Message::UnconfirmedSolution(serialized);
432 _node.propagate(message, &[peer_ip]);
434 }
435 Err(error) => {
437 if _node.ledger.latest_height() % N::NUM_BLOCKS_PER_EPOCH > 10 {
438 debug!("Failed to verify the solution from peer_ip {peer_ip} - {error}")
439 }
440 }
441 }
442 } else {
443 warn!("Failed to retrieve the latest epoch hash.");
444 }
445 _node.num_verifying_solutions.fetch_sub(1, Relaxed);
447 });
448 if previous_counter + 1 >= MAX_PARALLEL_SOLUTION_VERIFICATIONS {
450 break;
451 }
452 }
453 }
454 });
455 }
456
457 fn initialize_deploy_verification(&self) {
459 let node = self.clone();
461 self.spawn(async move {
462 loop {
463 if node.shutdown.load(Acquire) {
465 info!("Shutting down deployment verification");
466 break;
467 }
468
469 let queue_is_empty = node.deploy_queue.lock().is_empty();
471 let counter_is_full =
473 node.num_verifying_deploys.load(Acquire) >= VM::<N, C>::MAX_PARALLEL_DEPLOY_VERIFICATIONS;
474
475 if queue_is_empty || counter_is_full {
477 sleep(Duration::from_millis(50)).await;
478 continue;
479 }
480
481 while let Some((_, (peer_ip, serialized, transaction))) = node.deploy_queue.lock().pop_lru() {
483 let previous_counter = node.num_verifying_deploys.fetch_add(1, Relaxed);
485 let _node = node.clone();
486 tokio::task::spawn_blocking(move || {
488 let Some(state_root) = transaction.fee_transition().map(|t| t.global_state_root()) else {
490 debug!("Failed to access global state root for deployment from peer_ip {peer_ip}");
491 _node.num_verifying_deploys.fetch_sub(1, Relaxed);
492 return;
493 };
494 if !_node.ledger().contains_state_root(&state_root).unwrap_or(false) {
496 debug!("Failed to find global state root for deployment from peer_ip {peer_ip}, propagating anyway");
497 _node.propagate(Message::UnconfirmedTransaction(serialized), &[peer_ip]);
499 _node.num_verifying_deploys.fetch_sub(1, Relaxed);
500 return;
501 }
503 match _node.ledger.check_transaction_basic(&transaction, None, &mut rand::thread_rng()) {
505 Ok(_) => {
506 _node.propagate(Message::UnconfirmedTransaction(serialized), &[peer_ip]);
508 }
509 Err(error) => {
510 debug!("Failed to verify the deployment from peer_ip {peer_ip} - {error}");
511 }
512 }
513 _node.num_verifying_deploys.fetch_sub(1, Relaxed);
515 });
516 if previous_counter + 1 >= VM::<N, C>::MAX_PARALLEL_DEPLOY_VERIFICATIONS {
518 break;
519 }
520 }
521 }
522 });
523 }
524
525 fn initialize_execute_verification(&self) {
527 let node = self.clone();
529 self.spawn(async move {
530 loop {
531 if node.shutdown.load(Acquire) {
533 info!("Shutting down execution verification");
534 break;
535 }
536
537 let queue_is_empty = node.execute_queue.lock().is_empty();
539 let counter_is_full =
541 node.num_verifying_executions.load(Acquire) >= VM::<N, C>::MAX_PARALLEL_EXECUTE_VERIFICATIONS;
542
543 if queue_is_empty || counter_is_full {
545 sleep(Duration::from_millis(50)).await;
546 continue;
547 }
548
549 while let Some((_, (peer_ip, serialized, transaction))) = node.execute_queue.lock().pop_lru() {
551 let previous_counter = node.num_verifying_executions.fetch_add(1, Relaxed);
553 let _node = node.clone();
554 tokio::task::spawn_blocking(move || {
556 let state_roots = [
558 transaction.execution().map(|t| t.global_state_root()),
559 transaction.fee_transition().map(|t| t.global_state_root()),
560 ]
561 .into_iter()
562 .flatten();
563
564 for state_root in state_roots {
565 if !_node.ledger().contains_state_root(&state_root).unwrap_or(false) {
566 debug!("Failed to find global state root for execution from peer_ip {peer_ip}, propagating anyway");
567 _node.propagate(Message::UnconfirmedTransaction(serialized), &[peer_ip]);
569 _node.num_verifying_executions.fetch_sub(1, Relaxed);
570 return;
571 }
573 }
574 match _node.ledger.check_transaction_basic(&transaction, None, &mut rand::thread_rng()) {
576 Ok(_) => {
577 _node.propagate(Message::UnconfirmedTransaction(serialized), &[peer_ip]);
579 }
580 Err(error) => {
581 debug!("Failed to verify the execution from peer_ip {peer_ip} - {error}");
582 }
583 }
584 _node.num_verifying_executions.fetch_sub(1, Relaxed);
586 });
587 if previous_counter + 1 >= VM::<N, C>::MAX_PARALLEL_EXECUTE_VERIFICATIONS {
589 break;
590 }
591 }
592 }
593 });
594 }
595
596 pub fn spawn<T: Future<Output = ()> + Send + 'static>(&self, future: T) {
598 self.handles.lock().push(tokio::spawn(future));
599 }
600}
601
602#[async_trait]
603impl<N: Network, C: ConsensusStorage<N>> NodeInterface<N> for Client<N, C> {
604 async fn shut_down(&self) {
606 info!("Shutting down...");
607
608 trace!("Shutting down the node...");
610 self.shutdown.store(true, std::sync::atomic::Ordering::Release);
611
612 trace!("Shutting down the client...");
614 self.handles.lock().iter().for_each(|handle| handle.abort());
615
616 self.router.shut_down().await;
618
619 info!("Node has shut down.");
620 }
621}