1mod router;
17
18use crate::traits::NodeInterface;
19
20use snarkos_account::Account;
21use snarkos_node_bft::{events::DataBlocks, helpers::fmt_id, ledger_service::CoreLedgerService};
22use snarkos_node_cdn::CdnBlockSync;
23use snarkos_node_rest::Rest;
24use snarkos_node_router::{
25 Heartbeat,
26 Inbound,
27 Outbound,
28 Router,
29 Routing,
30 messages::{Message, NodeType, UnconfirmedSolution, UnconfirmedTransaction},
31};
32use snarkos_node_sync::{BLOCK_REQUEST_BATCH_DELAY, BlockSync, Ping, PrepareSyncRequest, locators::BlockLocators};
33use snarkos_node_tcp::{
34 P2P,
35 protocols::{Disconnect, Handshake, OnConnect, Reading},
36};
37use snarkvm::{
38 console::network::Network,
39 ledger::{
40 Ledger,
41 block::{Block, Header},
42 puzzle::{Puzzle, Solution, SolutionID},
43 store::ConsensusStorage,
44 },
45 prelude::{VM, block::Transaction},
46};
47
48use aleo_std::StorageMode;
49use anyhow::Result;
50use core::future::Future;
51use indexmap::IndexMap;
52#[cfg(feature = "locktick")]
53use locktick::parking_lot::Mutex;
54use lru::LruCache;
55#[cfg(not(feature = "locktick"))]
56use parking_lot::Mutex;
57use std::{
58 net::SocketAddr,
59 num::NonZeroUsize,
60 sync::{
61 Arc,
62 atomic::{
63 AtomicBool,
64 AtomicUsize,
65 Ordering::{Acquire, Relaxed},
66 },
67 },
68 time::{Duration, Instant},
69};
70use tokio::{
71 task::JoinHandle,
72 time::{sleep, timeout},
73};
74
75const MAX_PARALLEL_SOLUTION_VERIFICATIONS: usize = 20;
78const CAPACITY_FOR_DEPLOYMENTS: usize = 1 << 10;
81const CAPACITY_FOR_EXECUTIONS: usize = 1 << 10;
84const CAPACITY_FOR_SOLUTIONS: usize = 1 << 10;
87
88type TransactionContents<N> = (SocketAddr, UnconfirmedTransaction<N>, Transaction<N>);
91type SolutionContents<N> = (SocketAddr, UnconfirmedSolution<N>, Solution<N>);
94
95#[derive(Clone)]
97pub struct Client<N: Network, C: ConsensusStorage<N>> {
98 ledger: Ledger<N, C>,
100 router: Router<N>,
102 rest: Option<Rest<N, C, Self>>,
104 sync: Arc<BlockSync<N>>,
106 genesis: Block<N>,
108 puzzle: Puzzle<N>,
110 solution_queue: Arc<Mutex<LruCache<SolutionID<N>, SolutionContents<N>>>>,
112 deploy_queue: Arc<Mutex<LruCache<N::TransactionID, TransactionContents<N>>>>,
114 execute_queue: Arc<Mutex<LruCache<N::TransactionID, TransactionContents<N>>>>,
116 num_verifying_solutions: Arc<AtomicUsize>,
118 num_verifying_deploys: Arc<AtomicUsize>,
120 num_verifying_executions: Arc<AtomicUsize>,
122 handles: Arc<Mutex<Vec<JoinHandle<()>>>>,
124 shutdown: Arc<AtomicBool>,
126 ping: Arc<Ping<N>>,
128}
129
130impl<N: Network, C: ConsensusStorage<N>> Client<N, C> {
131 pub async fn new(
133 node_ip: SocketAddr,
134 rest_ip: Option<SocketAddr>,
135 rest_rps: u32,
136 account: Account<N>,
137 trusted_peers: &[SocketAddr],
138 genesis: Block<N>,
139 cdn: Option<http::Uri>,
140 storage_mode: StorageMode,
141 rotate_external_peers: bool,
142 dev: Option<u16>,
143 shutdown: Arc<AtomicBool>,
144 ) -> Result<Self> {
145 let signal_node = Self::handle_signals(shutdown.clone());
147
148 let ledger = Ledger::<N, C>::load(genesis.clone(), storage_mode.clone())?;
150
151 let ledger_service = Arc::new(CoreLedgerService::<N, C>::new(ledger.clone(), shutdown.clone()));
153 let allow_external_peers = true;
155
156 let router = Router::new(
158 node_ip,
159 NodeType::Client,
160 account,
161 ledger_service.clone(),
162 trusted_peers,
163 Self::MAXIMUM_NUMBER_OF_PEERS as u16,
164 rotate_external_peers,
165 allow_external_peers,
166 dev.is_some(),
167 )
168 .await?;
169
170 let sync = Arc::new(BlockSync::new(ledger_service.clone()));
172
173 let locators = sync.get_block_locators()?;
175 let ping = Arc::new(Ping::new(router.clone(), locators));
176
177 let mut node = Self {
179 ledger: ledger.clone(),
180 router,
181 rest: None,
182 sync: sync.clone(),
183 genesis,
184 ping,
185 puzzle: ledger.puzzle().clone(),
186 solution_queue: Arc::new(Mutex::new(LruCache::new(NonZeroUsize::new(CAPACITY_FOR_SOLUTIONS).unwrap()))),
187 deploy_queue: Arc::new(Mutex::new(LruCache::new(NonZeroUsize::new(CAPACITY_FOR_DEPLOYMENTS).unwrap()))),
188 execute_queue: Arc::new(Mutex::new(LruCache::new(NonZeroUsize::new(CAPACITY_FOR_EXECUTIONS).unwrap()))),
189 num_verifying_solutions: Default::default(),
190 num_verifying_deploys: Default::default(),
191 num_verifying_executions: Default::default(),
192 handles: Default::default(),
193 shutdown: shutdown.clone(),
194 };
195
196 let cdn_sync = cdn.map(|base_url| {
198 trace!("CDN sync is enabled");
199 Arc::new(CdnBlockSync::new(base_url, ledger.clone(), shutdown))
200 });
201
202 if let Some(rest_ip) = rest_ip {
204 node.rest = Some(
205 Rest::start(rest_ip, rest_rps, None, ledger.clone(), Arc::new(node.clone()), cdn_sync.clone(), sync)
206 .await?,
207 );
208 }
209
210 if let Some(cdn_sync) = cdn_sync {
212 if let Err(error) = cdn_sync.wait().await {
213 crate::log_clean_error(&storage_mode);
214 node.shut_down().await;
215 return Err(error);
216 }
217 }
218
219 node.initialize_routing().await;
221 node.initialize_sync();
223 node.initialize_solution_verification();
225 node.initialize_deploy_verification();
227 node.initialize_execute_verification();
229 node.handles.lock().push(crate::start_notification_message_loop());
231 let _ = signal_node.set(node.clone());
233 Ok(node)
235 }
236
237 pub fn ledger(&self) -> &Ledger<N, C> {
239 &self.ledger
240 }
241
242 pub fn rest(&self) -> &Option<Rest<N, C, Self>> {
244 &self.rest
245 }
246
247 pub fn router(&self) -> &Router<N> {
249 &self.router
250 }
251}
252
253impl<N: Network, C: ConsensusStorage<N>> Client<N, C> {
255 const SYNC_INTERVAL: Duration = std::time::Duration::from_secs(5);
256
257 fn initialize_sync(&self) {
259 let _self = self.clone();
261 let mut last_update = Instant::now();
262
263 self.handles.lock().push(tokio::spawn(async move {
264 loop {
265 if _self.shutdown.load(std::sync::atomic::Ordering::Acquire) {
267 info!("Shutting down block production");
268 break;
269 }
270
271 let now = Instant::now();
273 let elapsed = now.saturating_duration_since(last_update);
274 let sleep_time = Self::SYNC_INTERVAL.saturating_sub(elapsed);
275
276 if !sleep_time.is_zero() {
277 sleep(sleep_time).await;
278 }
279
280 _self.try_block_sync().await;
282 last_update = now;
283 }
284 }));
285 }
286
287 async fn try_block_sync(&self) {
289 let _ = timeout(Self::SYNC_INTERVAL, self.sync.wait_for_update()).await;
291
292 self.sync.set_sync_height(self.ledger.latest_height());
295
296 let new_requests = self.sync.handle_block_request_timeouts(self);
297 if let Some((block_requests, sync_peers)) = new_requests {
298 self.send_block_requests(block_requests, sync_peers).await;
299 }
300
301 if !self.sync.can_block_sync() {
304 return;
305 }
306
307 let (block_requests, sync_peers) = self.sync.prepare_block_requests();
310
311 if block_requests.is_empty() && self.sync.has_pending_responses() {
314 trace!("No block requests to send. Will process pending responses.");
316 let has_new_blocks = match self.sync.try_advancing_block_synchronization().await {
317 Ok(val) => val,
318 Err(err) => {
319 error!("{err}");
320 return;
321 }
322 };
323
324 if has_new_blocks {
325 match self.sync.get_block_locators() {
326 Ok(locators) => self.ping.update_block_locators(locators),
327 Err(err) => error!("Failed to get block locators: {err}"),
328 }
329 }
330 } else if block_requests.is_empty() {
331 let total_requests = self.sync.num_total_block_requests();
332 let num_outstanding = self.sync.num_outstanding_block_requests();
333 if total_requests > 0 {
334 trace!(
335 "Not block synced yet, but there are still {total_requests} in-flight requests. {num_outstanding} are still awaiting responses."
336 );
337 } else {
338 debug!(
340 "Not block synced yet, and there are no outstanding block requests or \
341 new block requests to send"
342 );
343 }
344 } else {
345 self.send_block_requests(block_requests, sync_peers).await;
346 }
347 }
348
349 async fn send_block_requests(
350 &self,
351 block_requests: Vec<(u32, PrepareSyncRequest<N>)>,
352 sync_peers: IndexMap<SocketAddr, BlockLocators<N>>,
353 ) {
354 for requests in block_requests.chunks(DataBlocks::<N>::MAXIMUM_NUMBER_OF_BLOCKS as usize) {
356 if !self.sync.send_block_requests(self, &sync_peers, requests).await {
357 break;
359 }
360
361 tokio::time::sleep(BLOCK_REQUEST_BATCH_DELAY).await;
363 }
364 }
365
366 fn initialize_solution_verification(&self) {
368 let node = self.clone();
370 self.handles.lock().push(tokio::spawn(async move {
371 loop {
372 if node.shutdown.load(Acquire) {
374 info!("Shutting down solution verification");
375 break;
376 }
377
378 let queue_is_empty = node.solution_queue.lock().is_empty();
380 let counter_is_full = node.num_verifying_solutions.load(Acquire) >= MAX_PARALLEL_SOLUTION_VERIFICATIONS;
382
383 if queue_is_empty || counter_is_full {
385 sleep(Duration::from_millis(50)).await;
386 continue;
387 }
388
389 let mut solution_queue = node.solution_queue.lock();
391 while let Some((_, (peer_ip, serialized, solution))) = solution_queue.pop_lru() {
392 let previous_counter = node.num_verifying_solutions.fetch_add(1, Relaxed);
394 let _node = node.clone();
395 tokio::task::spawn_blocking(move || {
397 if let Ok(epoch_hash) = _node.ledger.latest_epoch_hash() {
399 let prover_address = solution.address();
403 if _node.ledger.is_solution_limit_reached(&prover_address, 0) {
404 debug!("Invalid Solution '{}' - Prover '{prover_address}' has reached their solution limit for the current epoch", fmt_id(solution.id()));
405 }
406 let proof_target = _node.ledger.latest_block().header().proof_target();
408 let is_valid = _node.puzzle.check_solution(&solution, epoch_hash, proof_target);
410
411 match is_valid {
412 Ok(()) => {
414 let message = Message::UnconfirmedSolution(serialized);
415 _node.propagate(message, &[peer_ip]);
417 }
418 Err(error) => {
420 if _node.ledger.latest_height() % N::NUM_BLOCKS_PER_EPOCH > 10 {
421 debug!("Failed to verify the solution from peer_ip {peer_ip} - {error}")
422 }
423 }
424 }
425 } else {
426 warn!("Failed to retrieve the latest epoch hash.");
427 }
428 _node.num_verifying_solutions.fetch_sub(1, Relaxed);
430 });
431 if previous_counter + 1 >= MAX_PARALLEL_SOLUTION_VERIFICATIONS {
433 break;
434 }
435 }
436 }
437 }));
438 }
439
440 fn initialize_deploy_verification(&self) {
442 let node = self.clone();
444 self.handles.lock().push(tokio::spawn(async move {
445 loop {
446 if node.shutdown.load(Acquire) {
448 info!("Shutting down deployment verification");
449 break;
450 }
451
452 let queue_is_empty = node.deploy_queue.lock().is_empty();
454 let counter_is_full =
456 node.num_verifying_deploys.load(Acquire) >= VM::<N, C>::MAX_PARALLEL_DEPLOY_VERIFICATIONS;
457
458 if queue_is_empty || counter_is_full {
460 sleep(Duration::from_millis(50)).await;
461 continue;
462 }
463
464 while let Some((_, (peer_ip, serialized, transaction))) = node.deploy_queue.lock().pop_lru() {
466 let previous_counter = node.num_verifying_deploys.fetch_add(1, Relaxed);
468 let _node = node.clone();
469 tokio::task::spawn_blocking(move || {
471 match _node.ledger.check_transaction_basic(&transaction, None, &mut rand::thread_rng()) {
473 Ok(_) => {
474 _node.propagate(Message::UnconfirmedTransaction(serialized), &[peer_ip]);
476 }
477 Err(error) => {
478 debug!("Failed to verify the deployment from peer_ip {peer_ip} - {error}");
479 }
480 }
481 _node.num_verifying_deploys.fetch_sub(1, Relaxed);
483 });
484 if previous_counter + 1 >= VM::<N, C>::MAX_PARALLEL_DEPLOY_VERIFICATIONS {
486 break;
487 }
488 }
489 }
490 }));
491 }
492
493 fn initialize_execute_verification(&self) {
495 let node = self.clone();
497 self.handles.lock().push(tokio::spawn(async move {
498 loop {
499 if node.shutdown.load(Acquire) {
501 info!("Shutting down execution verification");
502 break;
503 }
504
505 let queue_is_empty = node.execute_queue.lock().is_empty();
507 let counter_is_full =
509 node.num_verifying_executions.load(Acquire) >= VM::<N, C>::MAX_PARALLEL_EXECUTE_VERIFICATIONS;
510
511 if queue_is_empty || counter_is_full {
513 sleep(Duration::from_millis(50)).await;
514 continue;
515 }
516
517 while let Some((_, (peer_ip, serialized, transaction))) = node.execute_queue.lock().pop_lru() {
519 let previous_counter = node.num_verifying_executions.fetch_add(1, Relaxed);
521 let _node = node.clone();
522 tokio::task::spawn_blocking(move || {
524 match _node.ledger.check_transaction_basic(&transaction, None, &mut rand::thread_rng()) {
526 Ok(_) => {
527 _node.propagate(Message::UnconfirmedTransaction(serialized), &[peer_ip]);
529 }
530 Err(error) => {
531 debug!("Failed to verify the execution from peer_ip {peer_ip} - {error}");
532 }
533 }
534 _node.num_verifying_executions.fetch_sub(1, Relaxed);
536 });
537 if previous_counter + 1 >= VM::<N, C>::MAX_PARALLEL_EXECUTE_VERIFICATIONS {
539 break;
540 }
541 }
542 }
543 }));
544 }
545
546 pub fn spawn<T: Future<Output = ()> + Send + 'static>(&self, future: T) {
548 self.handles.lock().push(tokio::spawn(future));
549 }
550}
551
552#[async_trait]
553impl<N: Network, C: ConsensusStorage<N>> NodeInterface<N> for Client<N, C> {
554 async fn shut_down(&self) {
556 info!("Shutting down...");
557
558 trace!("Shutting down the node...");
560 self.shutdown.store(true, std::sync::atomic::Ordering::Release);
561
562 trace!("Shutting down the client...");
564 self.handles.lock().iter().for_each(|handle| handle.abort());
565
566 self.router.shut_down().await;
568
569 info!("Node has shut down.");
570 }
571}