1mod router;
17
18use crate::traits::NodeInterface;
19use snarkos_account::Account;
20use snarkos_node_bft::ledger_service::ProverLedgerService;
21use snarkos_node_router::{
22 Heartbeat,
23 Inbound,
24 Outbound,
25 Router,
26 Routing,
27 messages::{Message, NodeType, UnconfirmedSolution},
28};
29use snarkos_node_sync::{BlockSync, BlockSyncMode};
30use snarkos_node_tcp::{
31 P2P,
32 protocols::{Disconnect, Handshake, OnConnect, Reading, Writing},
33};
34use snarkvm::{
35 ledger::narwhal::Data,
36 prelude::{
37 Network,
38 block::{Block, Header},
39 puzzle::{Puzzle, Solution},
40 store::ConsensusStorage,
41 },
42 synthesizer::VM,
43};
44
45use aleo_std::StorageMode;
46use anyhow::Result;
47use colored::Colorize;
48use core::{marker::PhantomData, time::Duration};
49#[cfg(feature = "locktick")]
50use locktick::parking_lot::{Mutex, RwLock};
51#[cfg(not(feature = "locktick"))]
52use parking_lot::{Mutex, RwLock};
53use rand::{CryptoRng, Rng, rngs::OsRng};
54use snarkos_node_bft::helpers::fmt_id;
55use std::{
56 net::SocketAddr,
57 sync::{
58 Arc,
59 atomic::{AtomicBool, AtomicU8, Ordering},
60 },
61};
62use tokio::task::JoinHandle;
63
64#[derive(Clone)]
66pub struct Prover<N: Network, C: ConsensusStorage<N>> {
67 router: Router<N>,
69 sync: Arc<BlockSync<N>>,
71 genesis: Block<N>,
73 puzzle: Puzzle<N>,
75 latest_epoch_hash: Arc<RwLock<Option<N::BlockHash>>>,
77 latest_block_header: Arc<RwLock<Option<Header<N>>>>,
79 puzzle_instances: Arc<AtomicU8>,
81 max_puzzle_instances: u8,
83 handles: Arc<Mutex<Vec<JoinHandle<()>>>>,
85 shutdown: Arc<AtomicBool>,
87 _phantom: PhantomData<C>,
89}
90
91impl<N: Network, C: ConsensusStorage<N>> Prover<N, C> {
92 pub async fn new(
94 node_ip: SocketAddr,
95 account: Account<N>,
96 trusted_peers: &[SocketAddr],
97 genesis: Block<N>,
98 storage_mode: StorageMode,
99 shutdown: Arc<AtomicBool>,
100 ) -> Result<Self> {
101 let signal_node = Self::handle_signals(shutdown.clone());
103
104 let ledger_service = Arc::new(ProverLedgerService::new());
106 let allow_external_peers = true;
108 let rotate_external_peers = false;
110
111 let router = Router::new(
113 node_ip,
114 NodeType::Prover,
115 account,
116 ledger_service.clone(),
117 trusted_peers,
118 Self::MAXIMUM_NUMBER_OF_PEERS as u16,
119 rotate_external_peers,
120 allow_external_peers,
121 matches!(storage_mode, StorageMode::Development(_)),
122 )
123 .await?;
124
125 let sync = BlockSync::new(BlockSyncMode::Router, ledger_service.clone(), router.tcp().clone());
127
128 let max_puzzle_instances = num_cpus::get().saturating_sub(2).clamp(1, 6);
130 let node = Self {
132 router,
133 sync: Arc::new(sync),
134 genesis,
135 puzzle: VM::<N, C>::new_puzzle()?,
136 latest_epoch_hash: Default::default(),
137 latest_block_header: Default::default(),
138 puzzle_instances: Default::default(),
139 max_puzzle_instances: u8::try_from(max_puzzle_instances)?,
140 handles: Default::default(),
141 shutdown,
142 _phantom: Default::default(),
143 };
144 node.initialize_routing().await;
146 node.initialize_puzzle().await;
148 node.handles.lock().push(crate::start_notification_message_loop());
150 let _ = signal_node.set(node.clone());
152 Ok(node)
154 }
155}
156
157#[async_trait]
158impl<N: Network, C: ConsensusStorage<N>> NodeInterface<N> for Prover<N, C> {
159 async fn shut_down(&self) {
161 info!("Shutting down...");
162
163 debug!("Shutting down the puzzle...");
165 self.shutdown.store(true, Ordering::Release);
166
167 debug!("Shutting down the prover...");
169 self.handles.lock().iter().for_each(|handle| handle.abort());
170
171 self.router.shut_down().await;
173
174 info!("Node has shut down.");
175 }
176}
177
178impl<N: Network, C: ConsensusStorage<N>> Prover<N, C> {
179 async fn initialize_puzzle(&self) {
181 for _ in 0..self.max_puzzle_instances {
182 let prover = self.clone();
183 self.handles.lock().push(tokio::spawn(async move {
184 prover.puzzle_loop().await;
185 }));
186 }
187 }
188
189 async fn puzzle_loop(&self) {
191 loop {
192 if self.router.number_of_connected_peers() == 0 {
194 debug!("Skipping an iteration of the puzzle (no connected peers)");
195 tokio::time::sleep(Duration::from_secs(N::ANCHOR_TIME as u64)).await;
196 continue;
197 }
198
199 if self.num_puzzle_instances() > self.max_puzzle_instances {
201 tokio::time::sleep(Duration::from_millis(500)).await;
203 continue;
204 }
205
206 let latest_epoch_hash = *self.latest_epoch_hash.read();
208 let latest_state = self
210 .latest_block_header
211 .read()
212 .as_ref()
213 .map(|header| (header.coinbase_target(), header.proof_target()));
214
215 if let (Some(epoch_hash), Some((coinbase_target, proof_target))) = (latest_epoch_hash, latest_state) {
217 let prover = self.clone();
219 let result = tokio::task::spawn_blocking(move || {
220 prover.puzzle_iteration(epoch_hash, coinbase_target, proof_target, &mut OsRng)
221 })
222 .await;
223
224 if let Ok(Some((solution_target, solution))) = result {
226 info!("Found a Solution '{}' (Proof Target {solution_target})", solution.id());
227 self.broadcast_solution(solution);
229 }
230 } else {
231 tokio::time::sleep(Duration::from_secs(1)).await;
233 }
234
235 if self.shutdown.load(Ordering::Acquire) {
237 debug!("Shutting down the puzzle...");
238 break;
239 }
240 }
241 }
242
243 fn puzzle_iteration<R: Rng + CryptoRng>(
245 &self,
246 epoch_hash: N::BlockHash,
247 coinbase_target: u64,
248 proof_target: u64,
249 rng: &mut R,
250 ) -> Option<(u64, Solution<N>)> {
251 self.increment_puzzle_instances();
253
254 debug!(
255 "Proving 'Puzzle' for Epoch '{}' {}",
256 fmt_id(epoch_hash),
257 format!("(Coinbase Target {coinbase_target}, Proof Target {proof_target})").dimmed()
258 );
259
260 let result =
262 self.puzzle.prove(epoch_hash, self.address(), rng.gen(), Some(proof_target)).ok().and_then(|solution| {
263 self.puzzle.get_proof_target(&solution).ok().map(|solution_target| (solution_target, solution))
264 });
265
266 self.decrement_puzzle_instances();
268 result
270 }
271
272 fn broadcast_solution(&self, solution: Solution<N>) {
274 let message = Message::UnconfirmedSolution(UnconfirmedSolution {
276 solution_id: solution.id(),
277 solution: Data::Object(solution),
278 });
279 self.propagate(message, &[]);
281 }
282
283 fn num_puzzle_instances(&self) -> u8 {
285 self.puzzle_instances.load(Ordering::Relaxed)
286 }
287
288 fn increment_puzzle_instances(&self) {
290 self.puzzle_instances.fetch_add(1, Ordering::Relaxed);
291 #[cfg(debug_assertions)]
292 trace!("Number of Instances - {}", self.num_puzzle_instances());
293 }
294
295 fn decrement_puzzle_instances(&self) {
297 self.puzzle_instances.fetch_sub(1, Ordering::Relaxed);
298 #[cfg(debug_assertions)]
299 trace!("Number of Instances - {}", self.num_puzzle_instances());
300 }
301}