1mod router;
17
18use crate::traits::NodeInterface;
19
20use snarkos_account::Account;
21use snarkos_node_bft::{events::DataBlocks, helpers::fmt_id, ledger_service::CoreLedgerService};
22use snarkos_node_cdn::CdnBlockSync;
23use snarkos_node_rest::Rest;
24use snarkos_node_router::{
25 Heartbeat,
26 Inbound,
27 Outbound,
28 Router,
29 Routing,
30 messages::{Message, NodeType, UnconfirmedSolution, UnconfirmedTransaction},
31};
32use snarkos_node_sync::{BLOCK_REQUEST_BATCH_DELAY, BlockSync, Ping, PrepareSyncRequest, locators::BlockLocators};
33use snarkos_node_tcp::{
34 P2P,
35 protocols::{Disconnect, Handshake, OnConnect, Reading},
36};
37use snarkvm::{
38 console::network::Network,
39 ledger::{
40 Ledger,
41 block::{Block, Header},
42 puzzle::{Puzzle, Solution, SolutionID},
43 store::ConsensusStorage,
44 },
45 prelude::block::Transaction,
46};
47
48use aleo_std::StorageMode;
49use anyhow::Result;
50use core::future::Future;
51use indexmap::IndexMap;
52#[cfg(feature = "locktick")]
53use locktick::parking_lot::Mutex;
54use lru::LruCache;
55#[cfg(not(feature = "locktick"))]
56use parking_lot::Mutex;
57use std::{
58 net::SocketAddr,
59 num::NonZeroUsize,
60 sync::{
61 Arc,
62 atomic::{
63 AtomicBool,
64 AtomicUsize,
65 Ordering::{Acquire, Relaxed},
66 },
67 },
68 time::{Duration, Instant},
69};
70use tokio::{
71 task::JoinHandle,
72 time::{sleep, timeout},
73};
74
75const MAX_PARALLEL_DEPLOY_VERIFICATIONS: usize = 5;
78const MAX_PARALLEL_EXECUTE_VERIFICATIONS: usize = 1000;
81const MAX_PARALLEL_SOLUTION_VERIFICATIONS: usize = 20;
84const CAPACITY_FOR_DEPLOYMENTS: usize = 1 << 10;
87const CAPACITY_FOR_EXECUTIONS: usize = 1 << 10;
90const CAPACITY_FOR_SOLUTIONS: usize = 1 << 10;
93
94type TransactionContents<N> = (SocketAddr, UnconfirmedTransaction<N>, Transaction<N>);
97type SolutionContents<N> = (SocketAddr, UnconfirmedSolution<N>, Solution<N>);
100
101#[derive(Clone)]
103pub struct Client<N: Network, C: ConsensusStorage<N>> {
104 ledger: Ledger<N, C>,
106 router: Router<N>,
108 rest: Option<Rest<N, C, Self>>,
110 sync: Arc<BlockSync<N>>,
112 genesis: Block<N>,
114 puzzle: Puzzle<N>,
116 solution_queue: Arc<Mutex<LruCache<SolutionID<N>, SolutionContents<N>>>>,
118 deploy_queue: Arc<Mutex<LruCache<N::TransactionID, TransactionContents<N>>>>,
120 execute_queue: Arc<Mutex<LruCache<N::TransactionID, TransactionContents<N>>>>,
122 num_verifying_solutions: Arc<AtomicUsize>,
124 num_verifying_deploys: Arc<AtomicUsize>,
126 num_verifying_executions: Arc<AtomicUsize>,
128 handles: Arc<Mutex<Vec<JoinHandle<()>>>>,
130 shutdown: Arc<AtomicBool>,
132 ping: Arc<Ping<N>>,
134}
135
136impl<N: Network, C: ConsensusStorage<N>> Client<N, C> {
137 pub async fn new(
139 node_ip: SocketAddr,
140 rest_ip: Option<SocketAddr>,
141 rest_rps: u32,
142 account: Account<N>,
143 trusted_peers: &[SocketAddr],
144 genesis: Block<N>,
145 cdn: Option<String>,
146 storage_mode: StorageMode,
147 rotate_external_peers: bool,
148 shutdown: Arc<AtomicBool>,
149 ) -> Result<Self> {
150 let signal_node = Self::handle_signals(shutdown.clone());
152
153 let ledger = Ledger::<N, C>::load(genesis.clone(), storage_mode.clone())?;
155
156 let ledger_service = Arc::new(CoreLedgerService::<N, C>::new(ledger.clone(), shutdown.clone()));
158 let allow_external_peers = true;
160
161 let router = Router::new(
163 node_ip,
164 NodeType::Client,
165 account,
166 ledger_service.clone(),
167 trusted_peers,
168 Self::MAXIMUM_NUMBER_OF_PEERS as u16,
169 rotate_external_peers,
170 allow_external_peers,
171 matches!(storage_mode, StorageMode::Development(_)),
172 )
173 .await?;
174
175 let sync = Arc::new(BlockSync::new(ledger_service.clone()));
177
178 let locators = sync.get_block_locators()?;
180 let ping = Arc::new(Ping::new(router.clone(), locators));
181
182 let mut node = Self {
184 ledger: ledger.clone(),
185 router,
186 rest: None,
187 sync: sync.clone(),
188 genesis,
189 ping,
190 puzzle: ledger.puzzle().clone(),
191 solution_queue: Arc::new(Mutex::new(LruCache::new(NonZeroUsize::new(CAPACITY_FOR_SOLUTIONS).unwrap()))),
192 deploy_queue: Arc::new(Mutex::new(LruCache::new(NonZeroUsize::new(CAPACITY_FOR_DEPLOYMENTS).unwrap()))),
193 execute_queue: Arc::new(Mutex::new(LruCache::new(NonZeroUsize::new(CAPACITY_FOR_EXECUTIONS).unwrap()))),
194 num_verifying_solutions: Default::default(),
195 num_verifying_deploys: Default::default(),
196 num_verifying_executions: Default::default(),
197 handles: Default::default(),
198 shutdown: shutdown.clone(),
199 };
200
201 let cdn_sync = cdn.map(|base_url| {
203 trace!("CDN sync is enabled");
204 Arc::new(CdnBlockSync::new(base_url, ledger.clone(), shutdown))
205 });
206
207 if let Some(rest_ip) = rest_ip {
209 node.rest = Some(
210 Rest::start(rest_ip, rest_rps, None, ledger.clone(), Arc::new(node.clone()), cdn_sync.clone(), sync)
211 .await?,
212 );
213 }
214
215 if let Some(cdn_sync) = cdn_sync {
217 if let Err(error) = cdn_sync.wait().await {
218 crate::log_clean_error(&storage_mode);
219 node.shut_down().await;
220 return Err(error);
221 }
222 }
223
224 node.initialize_routing().await;
226 node.initialize_sync();
228 node.initialize_solution_verification();
230 node.initialize_deploy_verification();
232 node.initialize_execute_verification();
234 node.handles.lock().push(crate::start_notification_message_loop());
236 let _ = signal_node.set(node.clone());
238 Ok(node)
240 }
241
242 pub fn ledger(&self) -> &Ledger<N, C> {
244 &self.ledger
245 }
246
247 pub fn rest(&self) -> &Option<Rest<N, C, Self>> {
249 &self.rest
250 }
251}
252
253impl<N: Network, C: ConsensusStorage<N>> Client<N, C> {
254 const SYNC_INTERVAL: Duration = std::time::Duration::from_secs(5);
255
256 fn initialize_sync(&self) {
258 let _self = self.clone();
260 let mut last_update = Instant::now();
261
262 self.handles.lock().push(tokio::spawn(async move {
263 loop {
264 if _self.shutdown.load(std::sync::atomic::Ordering::Acquire) {
266 info!("Shutting down block production");
267 break;
268 }
269
270 let now = Instant::now();
272 let elapsed = now.saturating_duration_since(last_update);
273 let sleep_time = Self::SYNC_INTERVAL.saturating_sub(elapsed);
274
275 if !sleep_time.is_zero() {
276 sleep(sleep_time).await;
277 }
278
279 _self.try_block_sync().await;
281 last_update = now;
282 }
283 }));
284 }
285
286 async fn try_block_sync(&self) {
288 let _ = timeout(Self::SYNC_INTERVAL, self.sync.wait_for_update()).await;
290
291 self.sync.set_sync_height(self.ledger.latest_height());
294
295 let new_requests = self.sync.handle_block_request_timeouts(self);
296 if let Some((block_requests, sync_peers)) = new_requests {
297 self.send_block_requests(block_requests, sync_peers).await;
298 }
299
300 if !self.sync.can_block_sync() {
303 return;
304 }
305
306 let (block_requests, sync_peers) = self.sync.prepare_block_requests();
309
310 if block_requests.is_empty() && self.sync.has_pending_responses() {
313 trace!("No block requests to send. Will process pending responses.");
315 let has_new_blocks = match self.sync.try_advancing_block_synchronization().await {
316 Ok(val) => val,
317 Err(err) => {
318 error!("{err}");
319 return;
320 }
321 };
322
323 if has_new_blocks {
324 match self.sync.get_block_locators() {
325 Ok(locators) => self.ping.update_block_locators(locators),
326 Err(err) => error!("Failed to get block locators: {err}"),
327 }
328 }
329 } else if block_requests.is_empty() {
330 let total_requests = self.sync.num_total_block_requests();
331 let num_outstanding = self.sync.num_outstanding_block_requests();
332 if total_requests > 0 {
333 trace!(
334 "Not block synced yet, but there are still {total_requests} in-flight requests. {num_outstanding} are still awaiting responses."
335 );
336 } else {
337 debug!(
339 "Not block synced yet, and there are no outstanding block requests or \
340 new block requests to send"
341 );
342 }
343 } else {
344 self.send_block_requests(block_requests, sync_peers).await;
345 }
346 }
347
348 async fn send_block_requests(
349 &self,
350 block_requests: Vec<(u32, PrepareSyncRequest<N>)>,
351 sync_peers: IndexMap<SocketAddr, BlockLocators<N>>,
352 ) {
353 for requests in block_requests.chunks(DataBlocks::<N>::MAXIMUM_NUMBER_OF_BLOCKS as usize) {
355 if !self.sync.send_block_requests(self, &sync_peers, requests).await {
356 break;
358 }
359
360 tokio::time::sleep(BLOCK_REQUEST_BATCH_DELAY).await;
362 }
363 }
364
365 fn initialize_solution_verification(&self) {
367 let node = self.clone();
369 self.handles.lock().push(tokio::spawn(async move {
370 loop {
371 if node.shutdown.load(Acquire) {
373 info!("Shutting down solution verification");
374 break;
375 }
376
377 let queue_is_empty = node.solution_queue.lock().is_empty();
379 let counter_is_full = node.num_verifying_solutions.load(Acquire) >= MAX_PARALLEL_SOLUTION_VERIFICATIONS;
381
382 if queue_is_empty || counter_is_full {
384 sleep(Duration::from_millis(50)).await;
385 continue;
386 }
387
388 let mut solution_queue = node.solution_queue.lock();
390 while let Some((_, (peer_ip, serialized, solution))) = solution_queue.pop_lru() {
391 let previous_counter = node.num_verifying_solutions.fetch_add(1, Relaxed);
393 let _node = node.clone();
394 tokio::task::spawn_blocking(move || {
396 if let Ok(epoch_hash) = _node.ledger.latest_epoch_hash() {
398 let prover_address = solution.address();
402 if _node.ledger.is_solution_limit_reached(&prover_address, 0) {
403 debug!("Invalid Solution '{}' - Prover '{prover_address}' has reached their solution limit for the current epoch", fmt_id(solution.id()));
404 }
405 let proof_target = _node.ledger.latest_block().header().proof_target();
407 let is_valid = _node.puzzle.check_solution(&solution, epoch_hash, proof_target);
409
410 match is_valid {
411 Ok(()) => {
413 let message = Message::UnconfirmedSolution(serialized);
414 _node.propagate(message, &[peer_ip]);
416 }
417 Err(error) => {
419 if _node.ledger.latest_height() % N::NUM_BLOCKS_PER_EPOCH > 10 {
420 debug!("Failed to verify the solution from peer_ip {peer_ip} - {error}")
421 }
422 }
423 }
424 } else {
425 warn!("Failed to retrieve the latest epoch hash.");
426 }
427 _node.num_verifying_solutions.fetch_sub(1, Relaxed);
429 });
430 if previous_counter + 1 >= MAX_PARALLEL_SOLUTION_VERIFICATIONS {
432 break;
433 }
434 }
435 }
436 }));
437 }
438
439 fn initialize_deploy_verification(&self) {
441 let node = self.clone();
443 self.handles.lock().push(tokio::spawn(async move {
444 loop {
445 if node.shutdown.load(Acquire) {
447 info!("Shutting down deployment verification");
448 break;
449 }
450
451 let queue_is_empty = node.deploy_queue.lock().is_empty();
453 let counter_is_full = node.num_verifying_deploys.load(Acquire) >= MAX_PARALLEL_DEPLOY_VERIFICATIONS;
455
456 if queue_is_empty || counter_is_full {
458 sleep(Duration::from_millis(50)).await;
459 continue;
460 }
461
462 while let Some((_, (peer_ip, serialized, transaction))) = node.deploy_queue.lock().pop_lru() {
464 let previous_counter = node.num_verifying_deploys.fetch_add(1, Relaxed);
466 let _node = node.clone();
467 tokio::task::spawn_blocking(move || {
469 match _node.ledger.check_transaction_basic(&transaction, None, &mut rand::thread_rng()) {
471 Ok(_) => {
472 _node.propagate(Message::UnconfirmedTransaction(serialized), &[peer_ip]);
474 }
475 Err(error) => {
476 debug!("Failed to verify the deployment from peer_ip {peer_ip} - {error}");
477 }
478 }
479 _node.num_verifying_deploys.fetch_sub(1, Relaxed);
481 });
482 if previous_counter + 1 >= MAX_PARALLEL_DEPLOY_VERIFICATIONS {
484 break;
485 }
486 }
487 }
488 }));
489 }
490
491 fn initialize_execute_verification(&self) {
493 let node = self.clone();
495 self.handles.lock().push(tokio::spawn(async move {
496 loop {
497 if node.shutdown.load(Acquire) {
499 info!("Shutting down execution verification");
500 break;
501 }
502
503 let queue_is_empty = node.execute_queue.lock().is_empty();
505 let counter_is_full = node.num_verifying_executions.load(Acquire) >= MAX_PARALLEL_EXECUTE_VERIFICATIONS;
507
508 if queue_is_empty || counter_is_full {
510 sleep(Duration::from_millis(50)).await;
511 continue;
512 }
513
514 while let Some((_, (peer_ip, serialized, transaction))) = node.execute_queue.lock().pop_lru() {
516 let previous_counter = node.num_verifying_executions.fetch_add(1, Relaxed);
518 let _node = node.clone();
519 tokio::task::spawn_blocking(move || {
521 match _node.ledger.check_transaction_basic(&transaction, None, &mut rand::thread_rng()) {
523 Ok(_) => {
524 _node.propagate(Message::UnconfirmedTransaction(serialized), &[peer_ip]);
526 }
527 Err(error) => {
528 debug!("Failed to verify the execution from peer_ip {peer_ip} - {error}");
529 }
530 }
531 _node.num_verifying_executions.fetch_sub(1, Relaxed);
533 });
534 if previous_counter + 1 >= MAX_PARALLEL_EXECUTE_VERIFICATIONS {
536 break;
537 }
538 }
539 }
540 }));
541 }
542
543 pub fn spawn<T: Future<Output = ()> + Send + 'static>(&self, future: T) {
545 self.handles.lock().push(tokio::spawn(future));
546 }
547}
548
549#[async_trait]
550impl<N: Network, C: ConsensusStorage<N>> NodeInterface<N> for Client<N, C> {
551 async fn shut_down(&self) {
553 info!("Shutting down...");
554
555 trace!("Shutting down the node...");
557 self.shutdown.store(true, std::sync::atomic::Ordering::Release);
558
559 trace!("Shutting down the client...");
561 self.handles.lock().iter().for_each(|handle| handle.abort());
562
563 self.router.shut_down().await;
565
566 info!("Node has shut down.");
567 }
568}