1mod router;
17
18use crate::traits::NodeInterface;
19use snarkos_account::Account;
20use snarkos_node_bft::ledger_service::CoreLedgerService;
21use snarkos_node_rest::Rest;
22use snarkos_node_router::{
23 Heartbeat,
24 Inbound,
25 Outbound,
26 Router,
27 Routing,
28 messages::{Message, NodeType, UnconfirmedSolution, UnconfirmedTransaction},
29};
30use snarkos_node_sync::{BlockSync, BlockSyncMode};
31use snarkos_node_tcp::{
32 P2P,
33 protocols::{Disconnect, Handshake, OnConnect, Reading, Writing},
34};
35use snarkvm::{
36 console::network::Network,
37 ledger::{
38 Ledger,
39 block::{Block, Header},
40 puzzle::{Puzzle, Solution, SolutionID},
41 store::ConsensusStorage,
42 },
43 prelude::block::Transaction,
44};
45
46use aleo_std::StorageMode;
47use anyhow::Result;
48use core::future::Future;
49use lru::LruCache;
50use parking_lot::Mutex;
51use std::{
52 net::SocketAddr,
53 num::NonZeroUsize,
54 sync::{
55 Arc,
56 atomic::{
57 AtomicBool,
58 AtomicUsize,
59 Ordering::{Acquire, Relaxed},
60 },
61 },
62 time::Duration,
63};
64use tokio::{task::JoinHandle, time::sleep};
65
66const MAX_PARALLEL_DEPLOY_VERIFICATIONS: usize = 5;
69const MAX_PARALLEL_EXECUTE_VERIFICATIONS: usize = 1000;
72const MAX_PARALLEL_SOLUTION_VERIFICATIONS: usize = 20;
75const CAPACITY_FOR_DEPLOYMENTS: usize = 1 << 10;
78const CAPACITY_FOR_EXECUTIONS: usize = 1 << 10;
81const CAPACITY_FOR_SOLUTIONS: usize = 1 << 10;
84
85type TransactionContents<N> = (SocketAddr, UnconfirmedTransaction<N>, Transaction<N>);
88type SolutionContents<N> = (SocketAddr, UnconfirmedSolution<N>, Solution<N>);
91
92#[derive(Clone)]
94pub struct Client<N: Network, C: ConsensusStorage<N>> {
95 ledger: Ledger<N, C>,
97 router: Router<N>,
99 rest: Option<Rest<N, C, Self>>,
101 sync: Arc<BlockSync<N>>,
103 genesis: Block<N>,
105 puzzle: Puzzle<N>,
107 solution_queue: Arc<Mutex<LruCache<SolutionID<N>, SolutionContents<N>>>>,
109 deploy_queue: Arc<Mutex<LruCache<N::TransactionID, TransactionContents<N>>>>,
111 execute_queue: Arc<Mutex<LruCache<N::TransactionID, TransactionContents<N>>>>,
113 num_verifying_solutions: Arc<AtomicUsize>,
115 num_verifying_deploys: Arc<AtomicUsize>,
117 num_verifying_executions: Arc<AtomicUsize>,
119 handles: Arc<Mutex<Vec<JoinHandle<()>>>>,
121 shutdown: Arc<AtomicBool>,
123}
124
125impl<N: Network, C: ConsensusStorage<N>> Client<N, C> {
126 pub async fn new(
128 node_ip: SocketAddr,
129 rest_ip: Option<SocketAddr>,
130 rest_rps: u32,
131 account: Account<N>,
132 trusted_peers: &[SocketAddr],
133 genesis: Block<N>,
134 cdn: Option<String>,
135 storage_mode: StorageMode,
136 rotate_external_peers: bool,
137 shutdown: Arc<AtomicBool>,
138 ) -> Result<Self> {
139 let signal_node = Self::handle_signals(shutdown.clone());
141
142 let ledger = Ledger::<N, C>::load(genesis.clone(), storage_mode.clone())?;
144
145 if let Some(base_url) = cdn {
147 if let Err((_, error)) =
149 snarkos_node_cdn::sync_ledger_with_cdn(&base_url, ledger.clone(), shutdown.clone()).await
150 {
151 crate::log_clean_error(&storage_mode);
152 return Err(error);
153 }
154 }
155
156 let ledger_service = Arc::new(CoreLedgerService::<N, C>::new(ledger.clone(), shutdown.clone()));
158 let allow_external_peers = true;
160
161 let router = Router::new(
163 node_ip,
164 NodeType::Client,
165 account,
166 trusted_peers,
167 Self::MAXIMUM_NUMBER_OF_PEERS as u16,
168 rotate_external_peers,
169 allow_external_peers,
170 matches!(storage_mode, StorageMode::Development(_)),
171 )
172 .await?;
173
174 let sync = BlockSync::new(BlockSyncMode::Router, ledger_service.clone(), router.tcp().clone());
176
177 let mut node = Self {
179 ledger: ledger.clone(),
180 router,
181 rest: None,
182 sync: Arc::new(sync),
183 genesis,
184 puzzle: ledger.puzzle().clone(),
185 solution_queue: Arc::new(Mutex::new(LruCache::new(NonZeroUsize::new(CAPACITY_FOR_SOLUTIONS).unwrap()))),
186 deploy_queue: Arc::new(Mutex::new(LruCache::new(NonZeroUsize::new(CAPACITY_FOR_DEPLOYMENTS).unwrap()))),
187 execute_queue: Arc::new(Mutex::new(LruCache::new(NonZeroUsize::new(CAPACITY_FOR_EXECUTIONS).unwrap()))),
188 num_verifying_solutions: Default::default(),
189 num_verifying_deploys: Default::default(),
190 num_verifying_executions: Default::default(),
191 handles: Default::default(),
192 shutdown,
193 };
194
195 if let Some(rest_ip) = rest_ip {
197 node.rest = Some(Rest::start(rest_ip, rest_rps, None, ledger.clone(), Arc::new(node.clone())).await?);
198 }
199 node.initialize_routing().await;
201 node.initialize_sync();
203 node.initialize_solution_verification();
205 node.initialize_deploy_verification();
207 node.initialize_execute_verification();
209 node.handles.lock().push(crate::start_notification_message_loop());
211 let _ = signal_node.set(node.clone());
213 Ok(node)
215 }
216
217 pub fn ledger(&self) -> &Ledger<N, C> {
219 &self.ledger
220 }
221
222 pub fn rest(&self) -> &Option<Rest<N, C, Self>> {
224 &self.rest
225 }
226}
227
228impl<N: Network, C: ConsensusStorage<N>> Client<N, C> {
229 fn initialize_sync(&self) {
231 let node = self.clone();
233 self.handles.lock().push(tokio::spawn(async move {
234 loop {
235 if node.shutdown.load(std::sync::atomic::Ordering::Acquire) {
237 info!("Shutting down block production");
238 break;
239 }
240
241 tokio::time::sleep(std::time::Duration::from_secs(5)).await;
243 node.sync.try_block_sync(&node).await;
245 }
246 }));
247 }
248
249 fn initialize_solution_verification(&self) {
251 let node = self.clone();
253 self.handles.lock().push(tokio::spawn(async move {
254 loop {
255 if node.shutdown.load(Acquire) {
257 info!("Shutting down solution verification");
258 break;
259 }
260
261 let queue_is_empty = node.solution_queue.lock().is_empty();
263 let counter_is_full = node.num_verifying_solutions.load(Acquire) >= MAX_PARALLEL_SOLUTION_VERIFICATIONS;
265
266 if queue_is_empty || counter_is_full {
268 sleep(Duration::from_millis(50)).await;
269 continue;
270 }
271
272 let mut solution_queue = node.solution_queue.lock();
274 while let Some((_, (peer_ip, serialized, solution))) = solution_queue.pop_lru() {
275 let previous_counter = node.num_verifying_solutions.fetch_add(1, Relaxed);
277 let _node = node.clone();
278 tokio::task::spawn_blocking(move || {
280 if let Ok(epoch_hash) = _node.ledger.latest_epoch_hash() {
282 let proof_target = _node.ledger.latest_block().header().proof_target();
284 let is_valid = _node.puzzle.check_solution(&solution, epoch_hash, proof_target);
286
287 match is_valid {
288 Ok(()) => {
290 let message = Message::UnconfirmedSolution(serialized);
291 _node.propagate(message, &[peer_ip]);
293 }
294 Err(error) => {
296 if _node.ledger.latest_height() % N::NUM_BLOCKS_PER_EPOCH > 10 {
297 debug!("Failed to verify the solution from peer_ip {peer_ip} - {error}")
298 }
299 }
300 }
301 } else {
302 warn!("Failed to retrieve the latest epoch hash.");
303 }
304 _node.num_verifying_solutions.fetch_sub(1, Relaxed);
306 });
307 if previous_counter + 1 >= MAX_PARALLEL_SOLUTION_VERIFICATIONS {
309 break;
310 }
311 }
312 }
313 }));
314 }
315
316 fn initialize_deploy_verification(&self) {
318 let node = self.clone();
320 self.handles.lock().push(tokio::spawn(async move {
321 loop {
322 if node.shutdown.load(Acquire) {
324 info!("Shutting down deployment verification");
325 break;
326 }
327
328 let queue_is_empty = node.deploy_queue.lock().is_empty();
330 let counter_is_full = node.num_verifying_deploys.load(Acquire) >= MAX_PARALLEL_DEPLOY_VERIFICATIONS;
332
333 if queue_is_empty || counter_is_full {
335 sleep(Duration::from_millis(50)).await;
336 continue;
337 }
338
339 while let Some((_, (peer_ip, serialized, transaction))) = node.deploy_queue.lock().pop_lru() {
341 let previous_counter = node.num_verifying_deploys.fetch_add(1, Relaxed);
343 let _node = node.clone();
344 tokio::task::spawn_blocking(move || {
346 match _node.ledger.check_transaction_basic(&transaction, None, &mut rand::thread_rng()) {
348 Ok(_) => {
349 _node.propagate(Message::UnconfirmedTransaction(serialized), &[peer_ip]);
351 }
352 Err(error) => {
353 debug!("Failed to verify the deployment from peer_ip {peer_ip} - {error}");
354 }
355 }
356 _node.num_verifying_deploys.fetch_sub(1, Relaxed);
358 });
359 if previous_counter + 1 >= MAX_PARALLEL_DEPLOY_VERIFICATIONS {
361 break;
362 }
363 }
364 }
365 }));
366 }
367
368 fn initialize_execute_verification(&self) {
370 let node = self.clone();
372 self.handles.lock().push(tokio::spawn(async move {
373 loop {
374 if node.shutdown.load(Acquire) {
376 info!("Shutting down execution verification");
377 break;
378 }
379
380 let queue_is_empty = node.execute_queue.lock().is_empty();
382 let counter_is_full = node.num_verifying_executions.load(Acquire) >= MAX_PARALLEL_EXECUTE_VERIFICATIONS;
384
385 if queue_is_empty || counter_is_full {
387 sleep(Duration::from_millis(50)).await;
388 continue;
389 }
390
391 while let Some((_, (peer_ip, serialized, transaction))) = node.execute_queue.lock().pop_lru() {
393 let previous_counter = node.num_verifying_executions.fetch_add(1, Relaxed);
395 let _node = node.clone();
396 tokio::task::spawn_blocking(move || {
398 match _node.ledger.check_transaction_basic(&transaction, None, &mut rand::thread_rng()) {
400 Ok(_) => {
401 _node.propagate(Message::UnconfirmedTransaction(serialized), &[peer_ip]);
403 }
404 Err(error) => {
405 debug!("Failed to verify the execution from peer_ip {peer_ip} - {error}");
406 }
407 }
408 _node.num_verifying_executions.fetch_sub(1, Relaxed);
410 });
411 if previous_counter + 1 >= MAX_PARALLEL_EXECUTE_VERIFICATIONS {
413 break;
414 }
415 }
416 }
417 }));
418 }
419
420 pub fn spawn<T: Future<Output = ()> + Send + 'static>(&self, future: T) {
422 self.handles.lock().push(tokio::spawn(future));
423 }
424}
425
426#[async_trait]
427impl<N: Network, C: ConsensusStorage<N>> NodeInterface<N> for Client<N, C> {
428 async fn shut_down(&self) {
430 info!("Shutting down...");
431
432 trace!("Shutting down the node...");
434 self.shutdown.store(true, std::sync::atomic::Ordering::Release);
435
436 trace!("Shutting down the validator...");
438 self.handles.lock().iter().for_each(|handle| handle.abort());
439
440 self.router.shut_down().await;
442
443 info!("Node has shut down.");
444 }
445}