1mod router;
17
18use crate::traits::NodeInterface;
19use snarkos_account::Account;
20use snarkos_node_bft::ledger_service::ProverLedgerService;
21use snarkos_node_router::{
22 Heartbeat,
23 Inbound,
24 Outbound,
25 PeerPoolHandling,
26 Router,
27 Routing,
28 messages::{Message, NodeType, UnconfirmedSolution},
29};
30use snarkos_node_sync::{BlockSync, Ping};
31use snarkos_node_tcp::{
32 P2P,
33 protocols::{Disconnect, Handshake, OnConnect, Reading},
34};
35use snarkvm::{
36 ledger::narwhal::Data,
37 prelude::{
38 Network,
39 block::{Block, Header},
40 puzzle::{Puzzle, Solution},
41 store::ConsensusStorage,
42 },
43 synthesizer::VM,
44};
45
46use aleo_std::StorageMode;
47use anyhow::Result;
48use colored::Colorize;
49use core::{marker::PhantomData, time::Duration};
50#[cfg(feature = "locktick")]
51use locktick::parking_lot::{Mutex, RwLock};
52#[cfg(not(feature = "locktick"))]
53use parking_lot::{Mutex, RwLock};
54use rand::{CryptoRng, Rng, rngs::OsRng};
55use snarkos_node_bft::helpers::fmt_id;
56use std::{
57 net::SocketAddr,
58 sync::{
59 Arc,
60 atomic::{AtomicBool, AtomicU8, Ordering},
61 },
62};
63use tokio::task::JoinHandle;
64
65#[derive(Clone)]
67pub struct Prover<N: Network, C: ConsensusStorage<N>> {
68 router: Router<N>,
70 sync: Arc<BlockSync<N>>,
72 genesis: Block<N>,
74 puzzle: Puzzle<N>,
76 latest_epoch_hash: Arc<RwLock<Option<N::BlockHash>>>,
78 latest_block_header: Arc<RwLock<Option<Header<N>>>>,
80 puzzle_instances: Arc<AtomicU8>,
82 max_puzzle_instances: u8,
84 handles: Arc<Mutex<Vec<JoinHandle<()>>>>,
86 shutdown: Arc<AtomicBool>,
88 ping: Arc<Ping<N>>,
90 _phantom: PhantomData<C>,
92}
93
94impl<N: Network, C: ConsensusStorage<N>> Prover<N, C> {
95 pub async fn new(
97 node_ip: SocketAddr,
98 account: Account<N>,
99 trusted_peers: &[SocketAddr],
100 genesis: Block<N>,
101 storage_mode: StorageMode,
102 dev: Option<u16>,
103 shutdown: Arc<AtomicBool>,
104 ) -> Result<Self> {
105 let signal_node = Self::handle_signals(shutdown.clone());
107
108 let ledger_service = Arc::new(ProverLedgerService::new());
110 let allow_external_peers = true;
112 let rotate_external_peers = false;
114
115 let router = Router::new(
117 node_ip,
118 NodeType::Prover,
119 account,
120 ledger_service.clone(),
121 trusted_peers,
122 Self::MAXIMUM_NUMBER_OF_PEERS as u16,
123 rotate_external_peers,
124 allow_external_peers,
125 storage_mode,
126 dev.is_some(),
127 )
128 .await?;
129
130 let sync = BlockSync::new(ledger_service.clone());
132
133 let ping = Arc::new(Ping::new_nosync(router.clone()));
135
136 let max_puzzle_instances = num_cpus::get().saturating_sub(2).clamp(1, 6);
138 let node = Self {
140 router,
141 sync: Arc::new(sync),
142 genesis,
143 puzzle: VM::<N, C>::new_puzzle()?,
144 latest_epoch_hash: Default::default(),
145 latest_block_header: Default::default(),
146 puzzle_instances: Default::default(),
147 max_puzzle_instances: u8::try_from(max_puzzle_instances)?,
148 handles: Default::default(),
149 ping,
150 shutdown,
151 _phantom: Default::default(),
152 };
153 node.initialize_routing().await;
155 node.initialize_puzzle().await;
157 node.handles.lock().push(crate::start_notification_message_loop());
159 let _ = signal_node.set(node.clone());
161 Ok(node)
163 }
164
165 pub fn router(&self) -> &Router<N> {
166 &self.router
167 }
168}
169
170#[async_trait]
171impl<N: Network, C: ConsensusStorage<N>> NodeInterface<N> for Prover<N, C> {
172 async fn shut_down(&self) {
174 info!("Shutting down...");
175
176 debug!("Shutting down the puzzle...");
178 self.shutdown.store(true, Ordering::Release);
179
180 debug!("Shutting down the prover...");
182 self.handles.lock().iter().for_each(|handle| handle.abort());
183
184 self.router.shut_down().await;
186
187 info!("Node has shut down.");
188 }
189}
190
191impl<N: Network, C: ConsensusStorage<N>> Prover<N, C> {
192 async fn initialize_puzzle(&self) {
194 for _ in 0..self.max_puzzle_instances {
195 let prover = self.clone();
196 self.handles.lock().push(tokio::spawn(async move {
197 prover.puzzle_loop().await;
198 }));
199 }
200 }
201
202 async fn puzzle_loop(&self) {
204 loop {
205 if self.router.number_of_connected_peers() == 0 {
207 debug!("Skipping an iteration of the puzzle (no connected peers)");
208 tokio::time::sleep(Duration::from_secs(N::ANCHOR_TIME as u64)).await;
209 continue;
210 }
211
212 if self.num_puzzle_instances() > self.max_puzzle_instances {
214 tokio::time::sleep(Duration::from_millis(500)).await;
216 continue;
217 }
218
219 let latest_epoch_hash = *self.latest_epoch_hash.read();
221 let latest_state = self
223 .latest_block_header
224 .read()
225 .as_ref()
226 .map(|header| (header.coinbase_target(), header.proof_target()));
227
228 if let (Some(epoch_hash), Some((coinbase_target, proof_target))) = (latest_epoch_hash, latest_state) {
230 let prover = self.clone();
232 let result = tokio::task::spawn_blocking(move || {
233 prover.puzzle_iteration(epoch_hash, coinbase_target, proof_target, &mut OsRng)
234 })
235 .await;
236
237 if let Ok(Some((solution_target, solution))) = result {
239 info!("Found a Solution '{}' (Proof Target {solution_target})", solution.id());
240 self.broadcast_solution(solution);
242 }
243 } else {
244 tokio::time::sleep(Duration::from_secs(1)).await;
246 }
247
248 if self.shutdown.load(Ordering::Acquire) {
250 debug!("Shutting down the puzzle...");
251 break;
252 }
253 }
254 }
255
256 fn puzzle_iteration<R: Rng + CryptoRng>(
258 &self,
259 epoch_hash: N::BlockHash,
260 coinbase_target: u64,
261 proof_target: u64,
262 rng: &mut R,
263 ) -> Option<(u64, Solution<N>)> {
264 self.increment_puzzle_instances();
266
267 debug!(
268 "Proving 'Puzzle' for Epoch '{}' {}",
269 fmt_id(epoch_hash),
270 format!("(Coinbase Target {coinbase_target}, Proof Target {proof_target})").dimmed()
271 );
272
273 let result =
275 self.puzzle.prove(epoch_hash, self.address(), rng.r#gen(), Some(proof_target)).ok().and_then(|solution| {
276 self.puzzle.get_proof_target(&solution).ok().map(|solution_target| (solution_target, solution))
277 });
278
279 self.decrement_puzzle_instances();
281 result
283 }
284
285 fn broadcast_solution(&self, solution: Solution<N>) {
287 let message = Message::UnconfirmedSolution(UnconfirmedSolution {
289 solution_id: solution.id(),
290 solution: Data::Object(solution),
291 });
292 self.propagate(message, &[]);
294 }
295
296 fn num_puzzle_instances(&self) -> u8 {
298 self.puzzle_instances.load(Ordering::Relaxed)
299 }
300
301 fn increment_puzzle_instances(&self) {
303 self.puzzle_instances.fetch_add(1, Ordering::Relaxed);
304 #[cfg(debug_assertions)]
305 trace!("Number of Instances - {}", self.num_puzzle_instances());
306 }
307
308 fn decrement_puzzle_instances(&self) {
310 self.puzzle_instances.fetch_sub(1, Ordering::Relaxed);
311 #[cfg(debug_assertions)]
312 trace!("Number of Instances - {}", self.num_puzzle_instances());
313 }
314}