1mod router;
17
18use crate::{
19 bft::{helpers::fmt_id, ledger_service::CoreLedgerService, spawn_blocking},
20 cdn::CdnBlockSync,
21 traits::NodeInterface,
22};
23
24use snarkos_account::Account;
25use snarkos_node_network::{ConnectionMode, NodeType};
26use snarkos_node_rest::Rest;
27use snarkos_node_router::{
28 Heartbeat,
29 Inbound,
30 Outbound,
31 Router,
32 Routing,
33 messages::{Message, UnconfirmedSolution, UnconfirmedTransaction},
34};
35use snarkos_node_sync::{BlockSync, Ping};
36use snarkos_node_tcp::{
37 P2P,
38 protocols::{Disconnect, Handshake, OnConnect, Reading},
39};
40use snarkos_utilities::{NodeDataDir, SignalHandler, Stoppable};
41
42use snarkvm::{
43 console::network::Network,
44 ledger::{
45 Ledger,
46 block::{Block, Header},
47 puzzle::{Puzzle, Solution, SolutionID},
48 store::ConsensusStorage,
49 },
50 prelude::{VM, block::Transaction},
51};
52
53use aleo_std::StorageMode;
54use anyhow::{Context, Result};
55use core::future::Future;
56#[cfg(feature = "locktick")]
57use locktick::parking_lot::Mutex;
58use lru::LruCache;
59#[cfg(not(feature = "locktick"))]
60use parking_lot::Mutex;
61use std::{
62 net::SocketAddr,
63 num::NonZeroUsize,
64 sync::{
65 Arc,
66 atomic::{
67 AtomicUsize,
68 Ordering::{Acquire, Relaxed},
69 },
70 },
71 time::Duration,
72};
73use tokio::{
74 task::JoinHandle,
75 time::{sleep, timeout},
76};
77
78const MAX_PARALLEL_SOLUTION_VERIFICATIONS: usize = 20;
81const CAPACITY_FOR_DEPLOYMENTS: usize = 1 << 10;
84const CAPACITY_FOR_EXECUTIONS: usize = 1 << 10;
87const CAPACITY_FOR_SOLUTIONS: usize = 1 << 10;
90
91type TransactionContents<N> = (SocketAddr, UnconfirmedTransaction<N>, Transaction<N>);
94type SolutionContents<N> = (SocketAddr, UnconfirmedSolution<N>, Solution<N>);
97
98#[derive(Clone)]
100pub struct Client<N: Network, C: ConsensusStorage<N>> {
101 ledger: Ledger<N, C>,
103 router: Router<N>,
105 rest: Option<Rest<N, C, Self>>,
107 sync: Arc<BlockSync<N>>,
109 genesis: Block<N>,
111 puzzle: Puzzle<N>,
113 solution_queue: Arc<Mutex<LruCache<SolutionID<N>, SolutionContents<N>>>>,
115 deploy_queue: Arc<Mutex<LruCache<N::TransactionID, TransactionContents<N>>>>,
117 execute_queue: Arc<Mutex<LruCache<N::TransactionID, TransactionContents<N>>>>,
119 num_verifying_solutions: Arc<AtomicUsize>,
121 num_verifying_deploys: Arc<AtomicUsize>,
123 num_verifying_executions: Arc<AtomicUsize>,
125 pub(crate) handles: Arc<Mutex<Vec<JoinHandle<()>>>>,
127 ping: Arc<Ping<N>>,
129 signal_handler: Arc<SignalHandler>,
131}
132
133impl<N: Network, C: ConsensusStorage<N>> Client<N, C> {
134 pub async fn new(
136 node_ip: SocketAddr,
137 rest_ip: Option<SocketAddr>,
138 rest_rps: u32,
139 account: Account<N>,
140 trusted_peers: &[SocketAddr],
141 genesis: Block<N>,
142 cdn: Option<http::Uri>,
143 storage_mode: StorageMode,
144 node_data_dir: NodeDataDir,
145 trusted_peers_only: bool,
146 dev: Option<u16>,
147 _slipstream_configs: &[std::path::PathBuf],
148 signal_handler: Arc<SignalHandler>,
149 ) -> Result<Self> {
150 let ledger = {
152 let storage_mode = storage_mode.clone();
153 let genesis = genesis.clone();
154
155 spawn_blocking!(Ledger::<N, C>::load(genesis, storage_mode))
156 }
157 .with_context(|| "Failed to initialize the ledger")?;
158
159 #[cfg(feature = "slipstream-plugins")]
161 if !_slipstream_configs.is_empty() {
162 let manager =
163 snarkvm::slipstream_plugin_manager::SlipstreamPluginManager::from_config_files(_slipstream_configs)
164 .context("Failed to initialize Slipstream plugin manager")?;
165 ledger.vm().finalize_store().set_slipstream_plugin_manager(manager);
166 let num_plugins = _slipstream_configs.len();
167 tracing::info!(target: "slipstream", "Slipstream plugin manager registered ({num_plugins} plugin(s))");
168 }
169
170 let ledger_service = Arc::new(CoreLedgerService::<N, C>::new(ledger.clone(), signal_handler.clone()));
172 let router = Router::new(
174 node_ip,
175 NodeType::Client,
176 account,
177 ledger_service.clone(),
178 trusted_peers,
179 Self::MAXIMUM_NUMBER_OF_PEERS as u16,
180 trusted_peers_only,
181 node_data_dir.clone(),
182 dev.is_some(),
183 )
184 .await?;
185
186 let sync = Arc::new(BlockSync::new(ledger_service.clone(), ConnectionMode::Router));
188
189 let locators = sync.get_block_locators()?;
191 let ping = Arc::new(Ping::new(router.clone(), locators));
192
193 let mut node = Self {
195 ledger: ledger.clone(),
196 router,
197 rest: None,
198 sync: sync.clone(),
199 genesis,
200 ping,
201 puzzle: ledger.puzzle().clone(),
202 solution_queue: Arc::new(Mutex::new(LruCache::new(NonZeroUsize::new(CAPACITY_FOR_SOLUTIONS).unwrap()))),
203 deploy_queue: Arc::new(Mutex::new(LruCache::new(NonZeroUsize::new(CAPACITY_FOR_DEPLOYMENTS).unwrap()))),
204 execute_queue: Arc::new(Mutex::new(LruCache::new(NonZeroUsize::new(CAPACITY_FOR_EXECUTIONS).unwrap()))),
205 num_verifying_solutions: Default::default(),
206 num_verifying_deploys: Default::default(),
207 num_verifying_executions: Default::default(),
208 handles: Default::default(),
209 signal_handler: signal_handler.clone(),
210 };
211
212 let cdn_sync = cdn.map(|base_url| {
214 trace!("CDN sync is enabled");
215 Arc::new(CdnBlockSync::new(base_url, ledger.clone(), signal_handler))
216 });
217
218 if let Some(rest_ip) = rest_ip {
220 node.rest = Some(
221 Rest::start(rest_ip, rest_rps, None, ledger.clone(), Arc::new(node.clone()), cdn_sync.clone(), sync)
222 .await?,
223 );
224 }
225
226 if let Some(cdn_sync) = cdn_sync {
228 if let Err(error) = cdn_sync.wait().await.with_context(|| "Failed to synchronize from the CDN") {
229 crate::log_clean_error(&storage_mode);
230 node.shut_down().await;
231 return Err(error);
232 }
233 }
234
235 node.initialize_routing().await;
237 node.initialize_sync();
239 node.initialize_solution_verification();
241 node.initialize_deploy_verification();
243 node.initialize_execute_verification();
245 node.handles.lock().push(crate::start_notification_message_loop());
247 Ok(node)
249 }
250
251 pub fn ledger(&self) -> &Ledger<N, C> {
253 &self.ledger
254 }
255
256 pub fn rest(&self) -> &Option<Rest<N, C, Self>> {
258 &self.rest
259 }
260
261 pub fn router(&self) -> &Router<N> {
263 &self.router
264 }
265}
266
267impl<N: Network, C: ConsensusStorage<N>> Client<N, C> {
269 const MAX_SYNC_INTERVAL: Duration = Duration::from_secs(30);
272
273 fn initialize_sync(&self) {
275 let self_ = self.clone();
277 self.spawn(async move {
278 while !self_.signal_handler.is_stopped() {
279 let _ = timeout(Self::MAX_SYNC_INTERVAL, self_.sync.wait_for_peer_update()).await;
281
282 self_.try_issuing_block_requests().await;
284 }
285
286 info!("Stopped block request generation");
287 });
288
289 let self_ = self.clone();
291 self.spawn(async move {
292 while !self_.signal_handler.is_stopped() {
293 let _ = timeout(Self::MAX_SYNC_INTERVAL, self_.sync.wait_for_block_responses()).await;
295
296 self_.try_advancing_block_synchronization().await;
298
299 }
302
303 debug!("Stopped block response processing");
304 });
305 }
306
307 async fn try_advancing_block_synchronization(&self) {
309 let has_new_blocks = match self.sync.try_advancing_block_synchronization().await {
310 Ok(val) => val,
311 Err(err) => {
312 error!("Block synchronization failed - {err}");
313 return;
314 }
315 };
316
317 if has_new_blocks {
319 match self.sync.get_block_locators() {
320 Ok(locators) => self.ping.update_block_locators(locators),
321 Err(err) => error!("Failed to get block locators: {err}"),
322 }
323 }
324 }
325
326 async fn try_issuing_block_requests(&self) {
328 self.sync.try_issuing_block_requests(self.router()).await;
329 }
330
331 fn initialize_solution_verification(&self) {
333 let node = self.clone();
335 self.spawn(async move {
336 loop {
337 if node.signal_handler.is_stopped() {
339 info!("Shutting down solution verification");
340 break;
341 }
342
343 let queue_is_empty = node.solution_queue.lock().is_empty();
345 let counter_is_full = node.num_verifying_solutions.load(Acquire) >= MAX_PARALLEL_SOLUTION_VERIFICATIONS;
347
348 if queue_is_empty || counter_is_full {
350 sleep(Duration::from_millis(50)).await;
351 continue;
352 }
353
354 let mut solution_queue = node.solution_queue.lock();
356 while let Some((_, (peer_ip, serialized, solution))) = solution_queue.pop_lru() {
357 let previous_counter = node.num_verifying_solutions.fetch_add(1, Relaxed);
359 let _node = node.clone();
360 tokio::task::spawn_blocking(move || {
362 if let Ok(epoch_hash) = _node.ledger.latest_epoch_hash() {
364 let prover_address = solution.address();
368 if _node.ledger.is_solution_limit_reached(&prover_address, 0) {
369 debug!("Invalid Solution '{}' - Prover '{prover_address}' has reached their solution limit for the current epoch", fmt_id(solution.id()));
370 }
371 let proof_target = _node.ledger.latest_block().header().proof_target();
373 let is_valid = _node.puzzle.check_solution(&solution, epoch_hash, proof_target);
375
376 match is_valid {
377 Ok(()) => {
379 let message = Message::UnconfirmedSolution(serialized);
380 _node.propagate(message, &[peer_ip]);
382 }
383 Err(error) => {
385 if _node.ledger.latest_height() % N::NUM_BLOCKS_PER_EPOCH > 10 {
386 debug!("Failed to verify the solution from peer_ip {peer_ip} - {error}")
387 }
388 }
389 }
390 } else {
391 warn!("Failed to retrieve the latest epoch hash.");
392 }
393 _node.num_verifying_solutions.fetch_sub(1, Relaxed);
395 });
396 if previous_counter + 1 >= MAX_PARALLEL_SOLUTION_VERIFICATIONS {
398 break;
399 }
400 }
401 }
402 });
403 }
404
405 fn initialize_deploy_verification(&self) {
407 let node = self.clone();
409 self.spawn(async move {
410 loop {
411 if node.signal_handler.is_stopped() {
413 info!("Shutting down deployment verification");
414 break;
415 }
416
417 let queue_is_empty = node.deploy_queue.lock().is_empty();
419 let counter_is_full =
421 node.num_verifying_deploys.load(Acquire) >= VM::<N, C>::MAX_PARALLEL_DEPLOY_VERIFICATIONS;
422
423 if queue_is_empty || counter_is_full {
425 sleep(Duration::from_millis(50)).await;
426 continue;
427 }
428
429 while let Some((_, (peer_ip, serialized, transaction))) = node.deploy_queue.lock().pop_lru() {
431 let previous_counter = node.num_verifying_deploys.fetch_add(1, Relaxed);
433 let _node = node.clone();
434 tokio::task::spawn_blocking(move || {
436 let Some(state_root) = transaction.fee_transition().map(|t| t.global_state_root()) else {
438 debug!("Failed to access global state root for deployment from peer_ip {peer_ip}");
439 _node.num_verifying_deploys.fetch_sub(1, Relaxed);
440 return;
441 };
442 if !_node.ledger().contains_state_root(&state_root).unwrap_or(false) {
444 debug!("Failed to find global state root for deployment from peer_ip {peer_ip}, propagating anyway");
445 _node.propagate(Message::UnconfirmedTransaction(serialized), &[peer_ip]);
447 _node.num_verifying_deploys.fetch_sub(1, Relaxed);
448 return;
449 }
451 match _node.ledger.check_transaction_basic(&transaction, None, &mut rand::rng()) {
453 Ok(_) => {
454 _node.propagate(Message::UnconfirmedTransaction(serialized), &[peer_ip]);
456 }
457 Err(error) => {
458 debug!("Failed to verify the deployment from peer_ip {peer_ip} - {error}");
459 }
460 }
461 _node.num_verifying_deploys.fetch_sub(1, Relaxed);
463 });
464 if previous_counter + 1 >= VM::<N, C>::MAX_PARALLEL_DEPLOY_VERIFICATIONS {
466 break;
467 }
468 }
469 }
470 });
471 }
472
473 fn initialize_execute_verification(&self) {
475 let node = self.clone();
477 self.spawn(async move {
478 loop {
479 if node.signal_handler.is_stopped() {
481 info!("Shutting down execution verification");
482 break;
483 }
484
485 let queue_is_empty = node.execute_queue.lock().is_empty();
487 let counter_is_full =
489 node.num_verifying_executions.load(Acquire) >= VM::<N, C>::MAX_PARALLEL_EXECUTE_VERIFICATIONS;
490
491 if queue_is_empty || counter_is_full {
493 sleep(Duration::from_millis(50)).await;
494 continue;
495 }
496
497 while let Some((_, (peer_ip, serialized, transaction))) = node.execute_queue.lock().pop_lru() {
499 let previous_counter = node.num_verifying_executions.fetch_add(1, Relaxed);
501 let _node = node.clone();
502 tokio::task::spawn_blocking(move || {
504 let state_roots = [
506 transaction.execution().map(|t| t.global_state_root()),
507 transaction.fee_transition().map(|t| t.global_state_root()),
508 ]
509 .into_iter()
510 .flatten();
511
512 for state_root in state_roots {
513 if !_node.ledger().contains_state_root(&state_root).unwrap_or(false) {
514 debug!("Failed to find global state root for execution from peer_ip {peer_ip}, propagating anyway");
515 _node.propagate(Message::UnconfirmedTransaction(serialized), &[peer_ip]);
517 _node.num_verifying_executions.fetch_sub(1, Relaxed);
518 return;
519 }
521 }
522 match _node.ledger.check_transaction_basic(&transaction, None, &mut rand::rng()) {
524 Ok(_) => {
525 _node.propagate(Message::UnconfirmedTransaction(serialized), &[peer_ip]);
527 }
528 Err(error) => {
529 debug!("Failed to verify the execution from peer_ip {peer_ip} - {error}");
530 }
531 }
532 _node.num_verifying_executions.fetch_sub(1, Relaxed);
534 });
535 if previous_counter + 1 >= VM::<N, C>::MAX_PARALLEL_EXECUTE_VERIFICATIONS {
537 break;
538 }
539 }
540 }
541 });
542 }
543
544 pub fn spawn<T: Future<Output = ()> + Send + 'static>(&self, future: T) {
546 self.handles.lock().push(tokio::spawn(future));
547 }
548}
549
550#[async_trait]
551impl<N: Network, C: ConsensusStorage<N>> NodeInterface<N> for Client<N, C> {
552 async fn shut_down(&self) {
554 info!("Shutting down...");
555
556 trace!("Shutting down the node...");
558
559 #[cfg(feature = "slipstream-plugins")]
561 if let Some(manager) = self.ledger.vm().finalize_store().slipstream_plugin_manager().write().as_mut() {
562 manager.unload();
563 }
564
565 if let Some(rest) = &self.rest {
567 trace!("Shutting down the REST server...");
568 rest.shut_down();
569 }
570
571 trace!("Shutting down the client...");
573 self.handles.lock().iter().for_each(|handle| handle.abort());
574
575 self.router.shut_down().await;
577
578 info!("Node has shut down.");
579 }
580}