1mod router;
17
18use crate::{
19 bft::{events::DataBlocks, helpers::fmt_id, ledger_service::CoreLedgerService, spawn_blocking},
20 cdn::CdnBlockSync,
21 traits::NodeInterface,
22};
23
24use snarkos_account::Account;
25use snarkos_node_network::NodeType;
26use snarkos_node_rest::Rest;
27use snarkos_node_router::{
28 Heartbeat,
29 Inbound,
30 Outbound,
31 Router,
32 Routing,
33 messages::{Message, UnconfirmedSolution, UnconfirmedTransaction},
34};
35use snarkos_node_sync::{BLOCK_REQUEST_BATCH_DELAY, BlockSync, Ping, PrepareSyncRequest, locators::BlockLocators};
36use snarkos_node_tcp::{
37 P2P,
38 protocols::{Disconnect, Handshake, OnConnect, Reading},
39};
40use snarkos_utilities::{NodeDataDir, SignalHandler, Stoppable};
41
42use snarkvm::{
43 console::network::Network,
44 ledger::{
45 Ledger,
46 block::{Block, Header},
47 puzzle::{Puzzle, Solution, SolutionID},
48 store::ConsensusStorage,
49 },
50 prelude::{VM, block::Transaction},
51 utilities::flatten_error,
52};
53
54use aleo_std::StorageMode;
55use anyhow::{Context, Result};
56use core::future::Future;
57use indexmap::IndexMap;
58#[cfg(feature = "locktick")]
59use locktick::parking_lot::Mutex;
60use lru::LruCache;
61#[cfg(not(feature = "locktick"))]
62use parking_lot::Mutex;
63use std::{
64 net::SocketAddr,
65 num::NonZeroUsize,
66 sync::{
67 Arc,
68 atomic::{
69 AtomicUsize,
70 Ordering::{Acquire, Relaxed},
71 },
72 },
73 time::Duration,
74};
75use tokio::{
76 task::JoinHandle,
77 time::{sleep, timeout},
78};
79
80const MAX_PARALLEL_SOLUTION_VERIFICATIONS: usize = 20;
83const CAPACITY_FOR_DEPLOYMENTS: usize = 1 << 10;
86const CAPACITY_FOR_EXECUTIONS: usize = 1 << 10;
89const CAPACITY_FOR_SOLUTIONS: usize = 1 << 10;
92
93type TransactionContents<N> = (SocketAddr, UnconfirmedTransaction<N>, Transaction<N>);
96type SolutionContents<N> = (SocketAddr, UnconfirmedSolution<N>, Solution<N>);
99
100#[derive(Clone)]
102pub struct Client<N: Network, C: ConsensusStorage<N>> {
103 ledger: Ledger<N, C>,
105 router: Router<N>,
107 rest: Option<Rest<N, C, Self>>,
109 sync: Arc<BlockSync<N>>,
111 genesis: Block<N>,
113 puzzle: Puzzle<N>,
115 solution_queue: Arc<Mutex<LruCache<SolutionID<N>, SolutionContents<N>>>>,
117 deploy_queue: Arc<Mutex<LruCache<N::TransactionID, TransactionContents<N>>>>,
119 execute_queue: Arc<Mutex<LruCache<N::TransactionID, TransactionContents<N>>>>,
121 num_verifying_solutions: Arc<AtomicUsize>,
123 num_verifying_deploys: Arc<AtomicUsize>,
125 num_verifying_executions: Arc<AtomicUsize>,
127 handles: Arc<Mutex<Vec<JoinHandle<()>>>>,
129 ping: Arc<Ping<N>>,
131 signal_handler: Arc<SignalHandler>,
133}
134
135impl<N: Network, C: ConsensusStorage<N>> Client<N, C> {
136 pub async fn new(
138 node_ip: SocketAddr,
139 rest_ip: Option<SocketAddr>,
140 rest_rps: u32,
141 account: Account<N>,
142 trusted_peers: &[SocketAddr],
143 genesis: Block<N>,
144 cdn: Option<http::Uri>,
145 storage_mode: StorageMode,
146 node_data_dir: NodeDataDir,
147 trusted_peers_only: bool,
148 dev: Option<u16>,
149 signal_handler: Arc<SignalHandler>,
150 ) -> Result<Self> {
151 let ledger = {
153 let storage_mode = storage_mode.clone();
154 let genesis = genesis.clone();
155
156 spawn_blocking!(Ledger::<N, C>::load(genesis, storage_mode))
157 }
158 .with_context(|| "Failed to initialize the ledger")?;
159
160 let ledger_service = Arc::new(CoreLedgerService::<N, C>::new(ledger.clone(), signal_handler.clone()));
162 let router = Router::new(
164 node_ip,
165 NodeType::Client,
166 account,
167 ledger_service.clone(),
168 trusted_peers,
169 Self::MAXIMUM_NUMBER_OF_PEERS as u16,
170 trusted_peers_only,
171 node_data_dir.clone(),
172 dev.is_some(),
173 )
174 .await?;
175
176 let sync = Arc::new(BlockSync::new(ledger_service.clone()));
178
179 let locators = sync.get_block_locators()?;
181 let ping = Arc::new(Ping::new(router.clone(), locators));
182
183 let mut node = Self {
185 ledger: ledger.clone(),
186 router,
187 rest: None,
188 sync: sync.clone(),
189 genesis,
190 ping,
191 puzzle: ledger.puzzle().clone(),
192 solution_queue: Arc::new(Mutex::new(LruCache::new(NonZeroUsize::new(CAPACITY_FOR_SOLUTIONS).unwrap()))),
193 deploy_queue: Arc::new(Mutex::new(LruCache::new(NonZeroUsize::new(CAPACITY_FOR_DEPLOYMENTS).unwrap()))),
194 execute_queue: Arc::new(Mutex::new(LruCache::new(NonZeroUsize::new(CAPACITY_FOR_EXECUTIONS).unwrap()))),
195 num_verifying_solutions: Default::default(),
196 num_verifying_deploys: Default::default(),
197 num_verifying_executions: Default::default(),
198 handles: Default::default(),
199 signal_handler: signal_handler.clone(),
200 };
201
202 let cdn_sync = cdn.map(|base_url| {
204 trace!("CDN sync is enabled");
205 Arc::new(CdnBlockSync::new(base_url, ledger.clone(), signal_handler))
206 });
207
208 if let Some(rest_ip) = rest_ip {
210 node.rest = Some(
211 Rest::start(rest_ip, rest_rps, None, ledger.clone(), Arc::new(node.clone()), cdn_sync.clone(), sync)
212 .await?,
213 );
214 }
215
216 if let Some(cdn_sync) = cdn_sync {
218 if let Err(error) = cdn_sync.wait().await.with_context(|| "Failed to synchronize from the CDN") {
219 crate::log_clean_error(&storage_mode);
220 node.shut_down().await;
221 return Err(error);
222 }
223 }
224
225 node.initialize_routing().await;
227 node.initialize_sync();
229 node.initialize_solution_verification();
231 node.initialize_deploy_verification();
233 node.initialize_execute_verification();
235 node.handles.lock().push(crate::start_notification_message_loop());
237 Ok(node)
239 }
240
241 pub fn ledger(&self) -> &Ledger<N, C> {
243 &self.ledger
244 }
245
246 pub fn rest(&self) -> &Option<Rest<N, C, Self>> {
248 &self.rest
249 }
250
251 pub fn router(&self) -> &Router<N> {
253 &self.router
254 }
255}
256
257impl<N: Network, C: ConsensusStorage<N>> Client<N, C> {
259 const MAX_SYNC_INTERVAL: Duration = Duration::from_secs(30);
262
263 fn initialize_sync(&self) {
265 let self_ = self.clone();
267 self.spawn(async move {
268 while !self_.signal_handler.is_stopped() {
269 self_.try_issuing_block_requests().await;
271 }
272
273 info!("Stopped block request generation");
274 });
275
276 let self_ = self.clone();
278 self.spawn(async move {
279 while !self_.signal_handler.is_stopped() {
280 let _ = timeout(Self::MAX_SYNC_INTERVAL, self_.sync.wait_for_block_responses()).await;
282
283 self_.try_advancing_block_synchronization().await;
285
286 }
289
290 debug!("Stopped block response processing");
291 });
292 }
293
294 async fn try_advancing_block_synchronization(&self) {
296 let has_new_blocks = match self.sync.try_advancing_block_synchronization().await {
297 Ok(val) => val,
298 Err(err) => {
299 error!("Block synchronization failed - {err}");
300 return;
301 }
302 };
303
304 if has_new_blocks {
306 match self.sync.get_block_locators() {
307 Ok(locators) => self.ping.update_block_locators(locators),
308 Err(err) => error!("Failed to get block locators: {err}"),
309 }
310 }
311 }
312
313 async fn try_issuing_block_requests(&self) {
315 let _ = timeout(Self::MAX_SYNC_INTERVAL, self.sync.wait_for_peer_update()).await;
317
318 self.sync.set_sync_height(self.ledger.latest_height());
321
322 match self.sync.handle_block_request_timeouts(&self.router) {
323 Ok(Some((requests, sync_peers))) => {
324 self.send_block_requests(requests, sync_peers).await;
326 return;
327 }
328 Ok(None) => {}
329 Err(err) => {
330 error!("{}", flatten_error(&err));
332 return;
333 }
334 }
335
336 if !self.sync.can_block_sync() {
339 trace!("Nothing to sync. Will not issue new block requests");
340 return;
341 }
342
343 let has_new_blocks = match self.sync.try_advancing_block_synchronization().await {
345 Ok(val) => val,
346 Err(err) => {
347 error!("{err}");
348 return;
349 }
350 };
351
352 if has_new_blocks {
353 match self.sync.get_block_locators() {
354 Ok(locators) => self.ping.update_block_locators(locators),
355 Err(err) => error!("Failed to get block locators: {err}"),
356 }
357
358 if !self.sync.can_block_sync() {
360 return;
361 }
362 }
363
364 let (block_requests, sync_peers) = self.sync.prepare_block_requests();
367
368 if block_requests.is_empty() {
371 let total_requests = self.sync.num_total_block_requests();
372 let num_outstanding = self.sync.num_outstanding_block_requests();
373 if total_requests > 0 {
374 trace!(
375 "Not block synced yet, but there are still {total_requests} in-flight requests. {num_outstanding} are still awaiting responses."
376 );
377 } else {
378 debug!(
380 "Not block synced yet, and there are no outstanding block requests or \
381 new block requests to send"
382 );
383 }
384 } else {
385 self.send_block_requests(block_requests, sync_peers).await;
386 }
387 }
388
389 async fn send_block_requests(
390 &self,
391 block_requests: Vec<(u32, PrepareSyncRequest<N>)>,
392 sync_peers: IndexMap<SocketAddr, BlockLocators<N>>,
393 ) {
394 for requests in block_requests.chunks(DataBlocks::<N>::MAXIMUM_NUMBER_OF_BLOCKS as usize) {
396 if !self.sync.send_block_requests(self.router(), &sync_peers, requests).await {
397 break;
399 }
400
401 tokio::time::sleep(BLOCK_REQUEST_BATCH_DELAY).await;
403 }
404 }
405
406 fn initialize_solution_verification(&self) {
408 let node = self.clone();
410 self.spawn(async move {
411 loop {
412 if node.signal_handler.is_stopped() {
414 info!("Shutting down solution verification");
415 break;
416 }
417
418 let queue_is_empty = node.solution_queue.lock().is_empty();
420 let counter_is_full = node.num_verifying_solutions.load(Acquire) >= MAX_PARALLEL_SOLUTION_VERIFICATIONS;
422
423 if queue_is_empty || counter_is_full {
425 sleep(Duration::from_millis(50)).await;
426 continue;
427 }
428
429 let mut solution_queue = node.solution_queue.lock();
431 while let Some((_, (peer_ip, serialized, solution))) = solution_queue.pop_lru() {
432 let previous_counter = node.num_verifying_solutions.fetch_add(1, Relaxed);
434 let _node = node.clone();
435 tokio::task::spawn_blocking(move || {
437 if let Ok(epoch_hash) = _node.ledger.latest_epoch_hash() {
439 let prover_address = solution.address();
443 if _node.ledger.is_solution_limit_reached(&prover_address, 0) {
444 debug!("Invalid Solution '{}' - Prover '{prover_address}' has reached their solution limit for the current epoch", fmt_id(solution.id()));
445 }
446 let proof_target = _node.ledger.latest_block().header().proof_target();
448 let is_valid = _node.puzzle.check_solution(&solution, epoch_hash, proof_target);
450
451 match is_valid {
452 Ok(()) => {
454 let message = Message::UnconfirmedSolution(serialized);
455 _node.propagate(message, &[peer_ip]);
457 }
458 Err(error) => {
460 if _node.ledger.latest_height() % N::NUM_BLOCKS_PER_EPOCH > 10 {
461 debug!("Failed to verify the solution from peer_ip {peer_ip} - {error}")
462 }
463 }
464 }
465 } else {
466 warn!("Failed to retrieve the latest epoch hash.");
467 }
468 _node.num_verifying_solutions.fetch_sub(1, Relaxed);
470 });
471 if previous_counter + 1 >= MAX_PARALLEL_SOLUTION_VERIFICATIONS {
473 break;
474 }
475 }
476 }
477 });
478 }
479
480 fn initialize_deploy_verification(&self) {
482 let node = self.clone();
484 self.spawn(async move {
485 loop {
486 if node.signal_handler.is_stopped() {
488 info!("Shutting down deployment verification");
489 break;
490 }
491
492 let queue_is_empty = node.deploy_queue.lock().is_empty();
494 let counter_is_full =
496 node.num_verifying_deploys.load(Acquire) >= VM::<N, C>::MAX_PARALLEL_DEPLOY_VERIFICATIONS;
497
498 if queue_is_empty || counter_is_full {
500 sleep(Duration::from_millis(50)).await;
501 continue;
502 }
503
504 while let Some((_, (peer_ip, serialized, transaction))) = node.deploy_queue.lock().pop_lru() {
506 let previous_counter = node.num_verifying_deploys.fetch_add(1, Relaxed);
508 let _node = node.clone();
509 tokio::task::spawn_blocking(move || {
511 let Some(state_root) = transaction.fee_transition().map(|t| t.global_state_root()) else {
513 debug!("Failed to access global state root for deployment from peer_ip {peer_ip}");
514 _node.num_verifying_deploys.fetch_sub(1, Relaxed);
515 return;
516 };
517 if !_node.ledger().contains_state_root(&state_root).unwrap_or(false) {
519 debug!("Failed to find global state root for deployment from peer_ip {peer_ip}, propagating anyway");
520 _node.propagate(Message::UnconfirmedTransaction(serialized), &[peer_ip]);
522 _node.num_verifying_deploys.fetch_sub(1, Relaxed);
523 return;
524 }
526 match _node.ledger.check_transaction_basic(&transaction, None, &mut rand::thread_rng()) {
528 Ok(_) => {
529 _node.propagate(Message::UnconfirmedTransaction(serialized), &[peer_ip]);
531 }
532 Err(error) => {
533 debug!("Failed to verify the deployment from peer_ip {peer_ip} - {error}");
534 }
535 }
536 _node.num_verifying_deploys.fetch_sub(1, Relaxed);
538 });
539 if previous_counter + 1 >= VM::<N, C>::MAX_PARALLEL_DEPLOY_VERIFICATIONS {
541 break;
542 }
543 }
544 }
545 });
546 }
547
548 fn initialize_execute_verification(&self) {
550 let node = self.clone();
552 self.spawn(async move {
553 loop {
554 if node.signal_handler.is_stopped() {
556 info!("Shutting down execution verification");
557 break;
558 }
559
560 let queue_is_empty = node.execute_queue.lock().is_empty();
562 let counter_is_full =
564 node.num_verifying_executions.load(Acquire) >= VM::<N, C>::MAX_PARALLEL_EXECUTE_VERIFICATIONS;
565
566 if queue_is_empty || counter_is_full {
568 sleep(Duration::from_millis(50)).await;
569 continue;
570 }
571
572 while let Some((_, (peer_ip, serialized, transaction))) = node.execute_queue.lock().pop_lru() {
574 let previous_counter = node.num_verifying_executions.fetch_add(1, Relaxed);
576 let _node = node.clone();
577 tokio::task::spawn_blocking(move || {
579 let state_roots = [
581 transaction.execution().map(|t| t.global_state_root()),
582 transaction.fee_transition().map(|t| t.global_state_root()),
583 ]
584 .into_iter()
585 .flatten();
586
587 for state_root in state_roots {
588 if !_node.ledger().contains_state_root(&state_root).unwrap_or(false) {
589 debug!("Failed to find global state root for execution from peer_ip {peer_ip}, propagating anyway");
590 _node.propagate(Message::UnconfirmedTransaction(serialized), &[peer_ip]);
592 _node.num_verifying_executions.fetch_sub(1, Relaxed);
593 return;
594 }
596 }
597 match _node.ledger.check_transaction_basic(&transaction, None, &mut rand::thread_rng()) {
599 Ok(_) => {
600 _node.propagate(Message::UnconfirmedTransaction(serialized), &[peer_ip]);
602 }
603 Err(error) => {
604 debug!("Failed to verify the execution from peer_ip {peer_ip} - {error}");
605 }
606 }
607 _node.num_verifying_executions.fetch_sub(1, Relaxed);
609 });
610 if previous_counter + 1 >= VM::<N, C>::MAX_PARALLEL_EXECUTE_VERIFICATIONS {
612 break;
613 }
614 }
615 }
616 });
617 }
618
619 pub fn spawn<T: Future<Output = ()> + Send + 'static>(&self, future: T) {
621 self.handles.lock().push(tokio::spawn(future));
622 }
623}
624
625#[async_trait]
626impl<N: Network, C: ConsensusStorage<N>> NodeInterface<N> for Client<N, C> {
627 async fn shut_down(&self) {
629 info!("Shutting down...");
630
631 trace!("Shutting down the node...");
633
634 trace!("Shutting down the client...");
636 self.handles.lock().iter().for_each(|handle| handle.abort());
637
638 self.router.shut_down().await;
640
641 info!("Node has shut down.");
642 }
643}