1mod router;
17
18use crate::traits::NodeInterface;
19use snarkos_account::Account;
20use snarkos_node_bft::ledger_service::ProverLedgerService;
21use snarkos_node_network::{NodeType, PeerPoolHandling};
22use snarkos_node_router::{
23 Heartbeat,
24 Inbound,
25 Outbound,
26 Router,
27 Routing,
28 messages::{Message, UnconfirmedSolution},
29};
30use snarkos_node_sync::{BlockSync, Ping};
31use snarkos_node_tcp::{
32 P2P,
33 protocols::{Disconnect, Handshake, OnConnect, Reading},
34};
35use snarkvm::{
36 ledger::narwhal::Data,
37 prelude::{
38 Network,
39 block::{Block, Header},
40 puzzle::{Puzzle, Solution},
41 store::ConsensusStorage,
42 },
43 synthesizer::VM,
44};
45
46use aleo_std::StorageMode;
47use anyhow::Result;
48use colored::Colorize;
49use core::{marker::PhantomData, time::Duration};
50#[cfg(feature = "locktick")]
51use locktick::parking_lot::{Mutex, RwLock};
52#[cfg(not(feature = "locktick"))]
53use parking_lot::{Mutex, RwLock};
54use rand::{CryptoRng, Rng, rngs::OsRng};
55use snarkos_node_bft::helpers::fmt_id;
56use std::{
57 net::SocketAddr,
58 sync::{
59 Arc,
60 atomic::{AtomicBool, AtomicU8, Ordering},
61 },
62};
63use tokio::task::JoinHandle;
64
65#[derive(Clone)]
67pub struct Prover<N: Network, C: ConsensusStorage<N>> {
68 router: Router<N>,
70 sync: Arc<BlockSync<N>>,
72 genesis: Block<N>,
74 puzzle: Puzzle<N>,
76 latest_epoch_hash: Arc<RwLock<Option<N::BlockHash>>>,
78 latest_block_header: Arc<RwLock<Option<Header<N>>>>,
80 puzzle_instances: Arc<AtomicU8>,
82 max_puzzle_instances: u8,
84 handles: Arc<Mutex<Vec<JoinHandle<()>>>>,
86 shutdown: Arc<AtomicBool>,
88 ping: Arc<Ping<N>>,
90 _phantom: PhantomData<C>,
92}
93
94impl<N: Network, C: ConsensusStorage<N>> Prover<N, C> {
95 pub async fn new(
97 node_ip: SocketAddr,
98 account: Account<N>,
99 trusted_peers: &[SocketAddr],
100 genesis: Block<N>,
101 storage_mode: StorageMode,
102 trusted_peers_only: bool,
103 dev: Option<u16>,
104 shutdown: Arc<AtomicBool>,
105 ) -> Result<Self> {
106 let signal_node = Self::handle_signals(shutdown.clone());
108
109 let ledger_service = Arc::new(ProverLedgerService::new());
111
112 let router = Router::new(
114 node_ip,
115 NodeType::Prover,
116 account,
117 ledger_service.clone(),
118 trusted_peers,
119 Self::MAXIMUM_NUMBER_OF_PEERS as u16,
120 trusted_peers_only,
121 storage_mode,
122 dev.is_some(),
123 )
124 .await?;
125
126 let sync = BlockSync::new(ledger_service.clone());
128
129 let ping = Arc::new(Ping::new_nosync(router.clone()));
131
132 let max_puzzle_instances = num_cpus::get().saturating_sub(2).clamp(1, 6);
134 let node = Self {
136 router,
137 sync: Arc::new(sync),
138 genesis,
139 puzzle: VM::<N, C>::new_puzzle()?,
140 latest_epoch_hash: Default::default(),
141 latest_block_header: Default::default(),
142 puzzle_instances: Default::default(),
143 max_puzzle_instances: u8::try_from(max_puzzle_instances)?,
144 handles: Default::default(),
145 ping,
146 shutdown,
147 _phantom: Default::default(),
148 };
149 node.initialize_routing().await;
151 node.initialize_puzzle().await;
153 node.handles.lock().push(crate::start_notification_message_loop());
155 let _ = signal_node.set(node.clone());
157 Ok(node)
159 }
160
161 pub fn router(&self) -> &Router<N> {
162 &self.router
163 }
164}
165
166#[async_trait]
167impl<N: Network, C: ConsensusStorage<N>> NodeInterface<N> for Prover<N, C> {
168 async fn shut_down(&self) {
170 info!("Shutting down...");
171
172 debug!("Shutting down the puzzle...");
174 self.shutdown.store(true, Ordering::Release);
175
176 debug!("Shutting down the prover...");
178 self.handles.lock().iter().for_each(|handle| handle.abort());
179
180 self.router.shut_down().await;
182
183 info!("Node has shut down.");
184 }
185}
186
187impl<N: Network, C: ConsensusStorage<N>> Prover<N, C> {
188 async fn initialize_puzzle(&self) {
190 for _ in 0..self.max_puzzle_instances {
191 let prover = self.clone();
192 self.handles.lock().push(tokio::spawn(async move {
193 prover.puzzle_loop().await;
194 }));
195 }
196 }
197
198 async fn puzzle_loop(&self) {
200 loop {
201 if self.router.number_of_connected_peers() == 0 {
203 debug!("Skipping an iteration of the puzzle (no connected peers)");
204 tokio::time::sleep(Duration::from_secs(N::ANCHOR_TIME as u64)).await;
205 continue;
206 }
207
208 if self.num_puzzle_instances() > self.max_puzzle_instances {
210 tokio::time::sleep(Duration::from_millis(500)).await;
212 continue;
213 }
214
215 let latest_epoch_hash = *self.latest_epoch_hash.read();
217 let latest_state = self
219 .latest_block_header
220 .read()
221 .as_ref()
222 .map(|header| (header.coinbase_target(), header.proof_target()));
223
224 if let (Some(epoch_hash), Some((coinbase_target, proof_target))) = (latest_epoch_hash, latest_state) {
226 let prover = self.clone();
228 let result = tokio::task::spawn_blocking(move || {
229 prover.puzzle_iteration(epoch_hash, coinbase_target, proof_target, &mut OsRng)
230 })
231 .await;
232
233 if let Ok(Some((solution_target, solution))) = result {
235 info!("Found a Solution '{}' (Proof Target {solution_target})", solution.id());
236 self.broadcast_solution(solution);
238 }
239 } else {
240 tokio::time::sleep(Duration::from_secs(1)).await;
242 }
243
244 if self.shutdown.load(Ordering::Acquire) {
246 debug!("Shutting down the puzzle...");
247 break;
248 }
249 }
250 }
251
252 fn puzzle_iteration<R: Rng + CryptoRng>(
254 &self,
255 epoch_hash: N::BlockHash,
256 coinbase_target: u64,
257 proof_target: u64,
258 rng: &mut R,
259 ) -> Option<(u64, Solution<N>)> {
260 self.increment_puzzle_instances();
262
263 debug!(
264 "Proving 'Puzzle' for Epoch '{}' {}",
265 fmt_id(epoch_hash),
266 format!("(Coinbase Target {coinbase_target}, Proof Target {proof_target})").dimmed()
267 );
268
269 let result =
271 self.puzzle.prove(epoch_hash, self.address(), rng.r#gen(), Some(proof_target)).ok().and_then(|solution| {
272 self.puzzle.get_proof_target(&solution).ok().map(|solution_target| (solution_target, solution))
273 });
274
275 self.decrement_puzzle_instances();
277 result
279 }
280
281 fn broadcast_solution(&self, solution: Solution<N>) {
283 let message = Message::UnconfirmedSolution(UnconfirmedSolution {
285 solution_id: solution.id(),
286 solution: Data::Object(solution),
287 });
288 self.propagate(message, &[]);
290 }
291
292 fn num_puzzle_instances(&self) -> u8 {
294 self.puzzle_instances.load(Ordering::Relaxed)
295 }
296
297 fn increment_puzzle_instances(&self) {
299 self.puzzle_instances.fetch_add(1, Ordering::Relaxed);
300 #[cfg(debug_assertions)]
301 trace!("Number of Instances - {}", self.num_puzzle_instances());
302 }
303
304 fn decrement_puzzle_instances(&self) {
306 self.puzzle_instances.fetch_sub(1, Ordering::Relaxed);
307 #[cfg(debug_assertions)]
308 trace!("Number of Instances - {}", self.num_puzzle_instances());
309 }
310}