1mod router;
17
18use crate::{
19 bft::ledger_service::ProverLedgerService,
20 sync::{BlockSync, Ping},
21 traits::NodeInterface,
22};
23
24use snarkos_account::Account;
25use snarkos_node_network::{NodeType, PeerPoolHandling};
26use snarkos_node_router::{
27 Heartbeat,
28 Inbound,
29 Outbound,
30 Router,
31 Routing,
32 messages::{Message, UnconfirmedSolution},
33};
34use snarkos_node_tcp::{
35 P2P,
36 protocols::{Disconnect, Handshake, OnConnect, Reading},
37};
38use snarkos_utilities::{NodeDataDir, SignalHandler, Stoppable};
39
40use snarkvm::{
41 ledger::narwhal::Data,
42 prelude::{
43 Network,
44 block::{Block, Header},
45 puzzle::{Puzzle, Solution},
46 store::ConsensusStorage,
47 },
48 synthesizer::VM,
49};
50
51use anyhow::Result;
52use colored::Colorize;
53use core::{marker::PhantomData, time::Duration};
54#[cfg(feature = "locktick")]
55use locktick::parking_lot::{Mutex, RwLock};
56#[cfg(not(feature = "locktick"))]
57use parking_lot::{Mutex, RwLock};
58use rand::{CryptoRng, Rng, rngs::OsRng};
59use snarkos_node_bft::helpers::fmt_id;
60use std::{
61 net::SocketAddr,
62 sync::{
63 Arc,
64 atomic::{AtomicU8, Ordering},
65 },
66};
67use tokio::task::JoinHandle;
68
69#[derive(Clone)]
71pub struct Prover<N: Network, C: ConsensusStorage<N>> {
72 router: Router<N>,
74 sync: Arc<BlockSync<N>>,
76 genesis: Block<N>,
78 puzzle: Puzzle<N>,
80 latest_epoch_hash: Arc<RwLock<Option<N::BlockHash>>>,
82 latest_block_header: Arc<RwLock<Option<Header<N>>>>,
84 puzzle_instances: Arc<AtomicU8>,
86 max_puzzle_instances: u8,
88 handles: Arc<Mutex<Vec<JoinHandle<()>>>>,
90 ping: Arc<Ping<N>>,
92 signal_handler: Arc<SignalHandler>,
94 _phantom: PhantomData<C>,
96}
97
98impl<N: Network, C: ConsensusStorage<N>> Prover<N, C> {
99 pub async fn new(
101 node_ip: SocketAddr,
102 account: Account<N>,
103 trusted_peers: &[SocketAddr],
104 genesis: Block<N>,
105 node_data_dir: NodeDataDir,
106 trusted_peers_only: bool,
107 dev: Option<u16>,
108 signal_handler: Arc<SignalHandler>,
109 ) -> Result<Self> {
110 let ledger_service = Arc::new(ProverLedgerService::new());
112
113 let router = Router::new(
115 node_ip,
116 NodeType::Prover,
117 account,
118 ledger_service.clone(),
119 trusted_peers,
120 Self::MAXIMUM_NUMBER_OF_PEERS as u16,
121 trusted_peers_only,
122 node_data_dir,
123 dev.is_some(),
124 )
125 .await?;
126
127 let sync = BlockSync::new(ledger_service.clone());
129
130 let ping = Arc::new(Ping::new_nosync(router.clone()));
132
133 let max_puzzle_instances = num_cpus::get().saturating_sub(2).clamp(1, 6);
135 let node = Self {
137 router,
138 sync: Arc::new(sync),
139 genesis,
140 puzzle: VM::<N, C>::new_puzzle()?,
141 latest_epoch_hash: Default::default(),
142 latest_block_header: Default::default(),
143 puzzle_instances: Default::default(),
144 max_puzzle_instances: u8::try_from(max_puzzle_instances)?,
145 handles: Default::default(),
146 ping,
147 signal_handler,
148 _phantom: Default::default(),
149 };
150 node.initialize_routing().await;
152 node.initialize_puzzle().await;
154 node.handles.lock().push(crate::start_notification_message_loop());
156
157 Ok(node)
159 }
160
161 pub fn router(&self) -> &Router<N> {
162 &self.router
163 }
164}
165
166#[async_trait]
167impl<N: Network, C: ConsensusStorage<N>> NodeInterface<N> for Prover<N, C> {
168 async fn shut_down(&self) {
170 info!("Shutting down...");
171
172 debug!("Shutting down the puzzle...");
174
175 debug!("Shutting down the prover...");
177 self.handles.lock().iter().for_each(|handle| handle.abort());
178
179 self.router.shut_down().await;
181
182 info!("Node has shut down.");
183 }
184}
185
186impl<N: Network, C: ConsensusStorage<N>> Prover<N, C> {
187 async fn initialize_puzzle(&self) {
189 for _ in 0..self.max_puzzle_instances {
190 let prover = self.clone();
191 self.handles.lock().push(tokio::spawn(async move {
192 prover.puzzle_loop().await;
193 }));
194 }
195 }
196
197 async fn puzzle_loop(&self) {
199 loop {
200 if self.router.number_of_connected_peers() == 0 {
202 debug!("Skipping an iteration of the puzzle (no connected peers)");
203 tokio::time::sleep(Duration::from_secs(N::ANCHOR_TIME as u64)).await;
204 continue;
205 }
206
207 if self.num_puzzle_instances() > self.max_puzzle_instances {
209 tokio::time::sleep(Duration::from_millis(500)).await;
211 continue;
212 }
213
214 let latest_epoch_hash = *self.latest_epoch_hash.read();
216 let latest_state = self
218 .latest_block_header
219 .read()
220 .as_ref()
221 .map(|header| (header.coinbase_target(), header.proof_target()));
222
223 if let (Some(epoch_hash), Some((coinbase_target, proof_target))) = (latest_epoch_hash, latest_state) {
225 let prover = self.clone();
227 let result = tokio::task::spawn_blocking(move || {
228 prover.puzzle_iteration(epoch_hash, coinbase_target, proof_target, &mut OsRng)
229 })
230 .await;
231
232 if let Ok(Some((solution_target, solution))) = result {
234 info!("Found a Solution '{}' (Proof Target {solution_target})", solution.id());
235 self.broadcast_solution(solution);
237 }
238 } else {
239 tokio::time::sleep(Duration::from_secs(1)).await;
241 }
242
243 if self.signal_handler.is_stopped() {
245 debug!("Shutting down the puzzle...");
246 break;
247 }
248 }
249 }
250
251 fn puzzle_iteration<R: Rng + CryptoRng>(
253 &self,
254 epoch_hash: N::BlockHash,
255 coinbase_target: u64,
256 proof_target: u64,
257 rng: &mut R,
258 ) -> Option<(u64, Solution<N>)> {
259 self.increment_puzzle_instances();
261
262 debug!(
263 "Proving 'Puzzle' for Epoch '{}' {}",
264 fmt_id(epoch_hash),
265 format!("(Coinbase Target {coinbase_target}, Proof Target {proof_target})").dimmed()
266 );
267
268 let result =
270 self.puzzle.prove(epoch_hash, self.address(), rng.r#gen(), Some(proof_target)).ok().and_then(|solution| {
271 self.puzzle.get_proof_target(&solution).ok().map(|solution_target| (solution_target, solution))
272 });
273
274 self.decrement_puzzle_instances();
276 result
278 }
279
280 fn broadcast_solution(&self, solution: Solution<N>) {
282 let message = Message::UnconfirmedSolution(UnconfirmedSolution {
284 solution_id: solution.id(),
285 solution: Data::Object(solution),
286 });
287 self.propagate(message, &[]);
289 }
290
291 fn num_puzzle_instances(&self) -> u8 {
293 self.puzzle_instances.load(Ordering::Relaxed)
294 }
295
296 fn increment_puzzle_instances(&self) {
298 self.puzzle_instances.fetch_add(1, Ordering::Relaxed);
299 #[cfg(debug_assertions)]
300 trace!("Number of Instances - {}", self.num_puzzle_instances());
301 }
302
303 fn decrement_puzzle_instances(&self) {
305 self.puzzle_instances.fetch_sub(1, Ordering::Relaxed);
306 #[cfg(debug_assertions)]
307 trace!("Number of Instances - {}", self.num_puzzle_instances());
308 }
309}