1mod router;
17
18use crate::traits::NodeInterface;
19use snarkos_account::Account;
20use snarkos_node_bft::ledger_service::ProverLedgerService;
21use snarkos_node_router::{
22 Heartbeat,
23 Inbound,
24 Outbound,
25 Router,
26 Routing,
27 messages::{Message, NodeType, UnconfirmedSolution},
28};
29use snarkos_node_sync::{BlockSync, BlockSyncMode};
30use snarkos_node_tcp::{
31 P2P,
32 protocols::{Disconnect, Handshake, OnConnect, Reading, Writing},
33};
34use snarkvm::{
35 ledger::narwhal::Data,
36 prelude::{
37 Network,
38 block::{Block, Header},
39 puzzle::{Puzzle, Solution},
40 store::ConsensusStorage,
41 },
42 synthesizer::VM,
43};
44
45use aleo_std::StorageMode;
46use anyhow::Result;
47use colored::Colorize;
48use core::{marker::PhantomData, time::Duration};
49use parking_lot::{Mutex, RwLock};
50use rand::{CryptoRng, Rng, rngs::OsRng};
51use snarkos_node_bft::helpers::fmt_id;
52use std::{
53 net::SocketAddr,
54 sync::{
55 Arc,
56 atomic::{AtomicBool, AtomicU8, Ordering},
57 },
58};
59use tokio::task::JoinHandle;
60
61#[derive(Clone)]
63pub struct Prover<N: Network, C: ConsensusStorage<N>> {
64 router: Router<N>,
66 sync: Arc<BlockSync<N>>,
68 genesis: Block<N>,
70 puzzle: Puzzle<N>,
72 latest_epoch_hash: Arc<RwLock<Option<N::BlockHash>>>,
74 latest_block_header: Arc<RwLock<Option<Header<N>>>>,
76 puzzle_instances: Arc<AtomicU8>,
78 max_puzzle_instances: u8,
80 handles: Arc<Mutex<Vec<JoinHandle<()>>>>,
82 shutdown: Arc<AtomicBool>,
84 _phantom: PhantomData<C>,
86}
87
88impl<N: Network, C: ConsensusStorage<N>> Prover<N, C> {
89 pub async fn new(
91 node_ip: SocketAddr,
92 account: Account<N>,
93 trusted_peers: &[SocketAddr],
94 genesis: Block<N>,
95 storage_mode: StorageMode,
96 shutdown: Arc<AtomicBool>,
97 ) -> Result<Self> {
98 let signal_node = Self::handle_signals(shutdown.clone());
100
101 let ledger_service = Arc::new(ProverLedgerService::new());
103 let allow_external_peers = true;
105 let rotate_external_peers = false;
107
108 let router = Router::new(
110 node_ip,
111 NodeType::Prover,
112 account,
113 trusted_peers,
114 Self::MAXIMUM_NUMBER_OF_PEERS as u16,
115 rotate_external_peers,
116 allow_external_peers,
117 matches!(storage_mode, StorageMode::Development(_)),
118 )
119 .await?;
120
121 let sync = BlockSync::new(BlockSyncMode::Router, ledger_service.clone(), router.tcp().clone());
123
124 let max_puzzle_instances = num_cpus::get().saturating_sub(2).clamp(1, 6);
126 let node = Self {
128 router,
129 sync: Arc::new(sync),
130 genesis,
131 puzzle: VM::<N, C>::new_puzzle()?,
132 latest_epoch_hash: Default::default(),
133 latest_block_header: Default::default(),
134 puzzle_instances: Default::default(),
135 max_puzzle_instances: u8::try_from(max_puzzle_instances)?,
136 handles: Default::default(),
137 shutdown,
138 _phantom: Default::default(),
139 };
140 node.initialize_routing().await;
142 node.initialize_puzzle().await;
144 node.handles.lock().push(crate::start_notification_message_loop());
146 let _ = signal_node.set(node.clone());
148 Ok(node)
150 }
151}
152
153#[async_trait]
154impl<N: Network, C: ConsensusStorage<N>> NodeInterface<N> for Prover<N, C> {
155 async fn shut_down(&self) {
157 info!("Shutting down...");
158
159 debug!("Shutting down the puzzle...");
161 self.shutdown.store(true, Ordering::Release);
162
163 debug!("Shutting down the prover...");
165 self.handles.lock().iter().for_each(|handle| handle.abort());
166
167 self.router.shut_down().await;
169
170 info!("Node has shut down.");
171 }
172}
173
174impl<N: Network, C: ConsensusStorage<N>> Prover<N, C> {
175 async fn initialize_puzzle(&self) {
177 for _ in 0..self.max_puzzle_instances {
178 let prover = self.clone();
179 self.handles.lock().push(tokio::spawn(async move {
180 prover.puzzle_loop().await;
181 }));
182 }
183 }
184
185 async fn puzzle_loop(&self) {
187 loop {
188 if self.router.number_of_connected_peers() == 0 {
190 debug!("Skipping an iteration of the puzzle (no connected peers)");
191 tokio::time::sleep(Duration::from_secs(N::ANCHOR_TIME as u64)).await;
192 continue;
193 }
194
195 if self.num_puzzle_instances() > self.max_puzzle_instances {
197 tokio::time::sleep(Duration::from_millis(500)).await;
199 continue;
200 }
201
202 let latest_epoch_hash = *self.latest_epoch_hash.read();
204 let latest_state = self
206 .latest_block_header
207 .read()
208 .as_ref()
209 .map(|header| (header.coinbase_target(), header.proof_target()));
210
211 if let (Some(epoch_hash), Some((coinbase_target, proof_target))) = (latest_epoch_hash, latest_state) {
213 let prover = self.clone();
215 let result = tokio::task::spawn_blocking(move || {
216 prover.puzzle_iteration(epoch_hash, coinbase_target, proof_target, &mut OsRng)
217 })
218 .await;
219
220 if let Ok(Some((solution_target, solution))) = result {
222 info!("Found a Solution '{}' (Proof Target {solution_target})", solution.id());
223 self.broadcast_solution(solution);
225 }
226 } else {
227 tokio::time::sleep(Duration::from_secs(1)).await;
229 }
230
231 if self.shutdown.load(Ordering::Acquire) {
233 debug!("Shutting down the puzzle...");
234 break;
235 }
236 }
237 }
238
239 fn puzzle_iteration<R: Rng + CryptoRng>(
241 &self,
242 epoch_hash: N::BlockHash,
243 coinbase_target: u64,
244 proof_target: u64,
245 rng: &mut R,
246 ) -> Option<(u64, Solution<N>)> {
247 self.increment_puzzle_instances();
249
250 debug!(
251 "Proving 'Puzzle' for Epoch '{}' {}",
252 fmt_id(epoch_hash),
253 format!("(Coinbase Target {coinbase_target}, Proof Target {proof_target})").dimmed()
254 );
255
256 let result =
258 self.puzzle.prove(epoch_hash, self.address(), rng.gen(), Some(proof_target)).ok().and_then(|solution| {
259 self.puzzle.get_proof_target(&solution).ok().map(|solution_target| (solution_target, solution))
260 });
261
262 self.decrement_puzzle_instances();
264 result
266 }
267
268 fn broadcast_solution(&self, solution: Solution<N>) {
270 let message = Message::UnconfirmedSolution(UnconfirmedSolution {
272 solution_id: solution.id(),
273 solution: Data::Object(solution),
274 });
275 self.propagate(message, &[]);
277 }
278
279 fn num_puzzle_instances(&self) -> u8 {
281 self.puzzle_instances.load(Ordering::Relaxed)
282 }
283
284 fn increment_puzzle_instances(&self) {
286 self.puzzle_instances.fetch_add(1, Ordering::Relaxed);
287 #[cfg(debug_assertions)]
288 trace!("Number of Instances - {}", self.num_puzzle_instances());
289 }
290
291 fn decrement_puzzle_instances(&self) {
293 self.puzzle_instances.fetch_sub(1, Ordering::Relaxed);
294 #[cfg(debug_assertions)]
295 trace!("Number of Instances - {}", self.num_puzzle_instances());
296 }
297}