1mod router;
17
18use crate::{
19 bft::ledger_service::ProverLedgerService,
20 sync::{BlockSync, Ping},
21 traits::NodeInterface,
22};
23
24use snarkos_account::Account;
25use snarkos_node_network::{ConnectionMode, NodeType, PeerPoolHandling};
26use snarkos_node_router::{
27 Heartbeat,
28 Inbound,
29 Outbound,
30 Router,
31 Routing,
32 messages::{Message, UnconfirmedSolution},
33};
34use snarkos_node_tcp::{
35 P2P,
36 protocols::{Disconnect, Handshake, OnConnect, Reading},
37};
38use snarkos_utilities::{NodeDataDir, SignalHandler, Stoppable};
39
40use snarkvm::{
41 console::network::consensus_config_value,
42 ledger::narwhal::Data,
43 prelude::{
44 Network,
45 block::{Block, Header},
46 puzzle::{Puzzle, Solution},
47 store::ConsensusStorage,
48 },
49 synthesizer::VM,
50};
51
52use anyhow::Result;
53use colored::Colorize;
54use core::{marker::PhantomData, time::Duration};
55#[cfg(feature = "locktick")]
56use locktick::parking_lot::{Mutex, RwLock};
57#[cfg(not(feature = "locktick"))]
58use parking_lot::{Mutex, RwLock};
59use rand::{CryptoRng, Rng, RngExt};
60use snarkos_node_bft::helpers::fmt_id;
61use std::{
62 net::SocketAddr,
63 sync::{
64 Arc,
65 atomic::{AtomicU8, Ordering},
66 },
67};
68use tokio::task::JoinHandle;
69
70#[derive(Clone)]
72pub struct Prover<N: Network, C: ConsensusStorage<N>> {
73 router: Router<N>,
75 sync: Arc<BlockSync<N>>,
77 genesis: Block<N>,
79 puzzle: Puzzle<N>,
81 latest_epoch_hash: Arc<RwLock<Option<N::BlockHash>>>,
83 latest_block_header: Arc<RwLock<Option<Header<N>>>>,
85 puzzle_instances: Arc<AtomicU8>,
87 max_puzzle_instances: u8,
89 handles: Arc<Mutex<Vec<JoinHandle<()>>>>,
91 ping: Arc<Ping<N>>,
93 signal_handler: Arc<SignalHandler>,
95 _phantom: PhantomData<C>,
97}
98
99impl<N: Network, C: ConsensusStorage<N>> Prover<N, C> {
100 pub async fn new(
102 node_ip: SocketAddr,
103 account: Account<N>,
104 trusted_peers: &[SocketAddr],
105 genesis: Block<N>,
106 node_data_dir: NodeDataDir,
107 trusted_peers_only: bool,
108 dev: Option<u16>,
109 signal_handler: Arc<SignalHandler>,
110 ) -> Result<Self> {
111 let ledger_service = Arc::new(ProverLedgerService::new());
113
114 let router = Router::new(
116 node_ip,
117 NodeType::Prover,
118 account,
119 ledger_service.clone(),
120 trusted_peers,
121 Self::MAXIMUM_NUMBER_OF_PEERS as u16,
122 trusted_peers_only,
123 node_data_dir,
124 dev.is_some(),
125 )
126 .await?;
127
128 let sync = BlockSync::new(ledger_service.clone(), ConnectionMode::Router);
130
131 let ping = Arc::new(Ping::new_nosync(router.clone()));
133
134 let max_puzzle_instances = num_cpus::get().saturating_sub(2).clamp(1, 6);
136 let node = Self {
138 router,
139 sync: Arc::new(sync),
140 genesis,
141 puzzle: VM::<N, C>::new_puzzle()?,
142 latest_epoch_hash: Default::default(),
143 latest_block_header: Default::default(),
144 puzzle_instances: Default::default(),
145 max_puzzle_instances: u8::try_from(max_puzzle_instances)?,
146 handles: Default::default(),
147 ping,
148 signal_handler,
149 _phantom: Default::default(),
150 };
151 node.initialize_routing().await;
153 node.initialize_puzzle().await;
155 node.handles.lock().push(crate::start_notification_message_loop());
157
158 Ok(node)
160 }
161
162 pub fn router(&self) -> &Router<N> {
163 &self.router
164 }
165}
166
167#[async_trait]
168impl<N: Network, C: ConsensusStorage<N>> NodeInterface<N> for Prover<N, C> {
169 async fn shut_down(&self) {
171 info!("Shutting down...");
172
173 debug!("Shutting down the puzzle...");
175
176 debug!("Shutting down the prover...");
178 self.handles.lock().iter().for_each(|handle| handle.abort());
179
180 self.router.shut_down().await;
182
183 info!("Node has shut down.");
184 }
185}
186
187impl<N: Network, C: ConsensusStorage<N>> Prover<N, C> {
188 async fn initialize_puzzle(&self) {
190 for _ in 0..self.max_puzzle_instances {
191 let prover = self.clone();
192 self.handles.lock().push(tokio::spawn(async move {
193 prover.puzzle_loop().await;
194 }));
195 }
196 }
197
198 async fn puzzle_loop(&self) {
200 loop {
201 if self.router.number_of_connected_peers() == 0 {
203 debug!("Skipping an iteration of the puzzle (no connected peers)");
204 let anchor_time = self
205 .latest_block_header
206 .read()
207 .as_ref()
208 .and_then(|header| consensus_config_value!(N, ANCHOR_TIMES, header.height()))
209 .unwrap_or_else(|| N::ANCHOR_TIMES.last().unwrap().1);
210 tokio::time::sleep(Duration::from_secs(anchor_time as u64)).await;
211 continue;
212 }
213
214 if self.num_puzzle_instances() > self.max_puzzle_instances {
216 tokio::time::sleep(Duration::from_millis(500)).await;
218 continue;
219 }
220
221 let latest_epoch_hash = *self.latest_epoch_hash.read();
223 let latest_state = self
225 .latest_block_header
226 .read()
227 .as_ref()
228 .map(|header| (header.coinbase_target(), header.proof_target()));
229
230 if let (Some(epoch_hash), Some((coinbase_target, proof_target))) = (latest_epoch_hash, latest_state) {
232 let prover = self.clone();
234 let result = tokio::task::spawn_blocking(move || {
235 prover.puzzle_iteration(epoch_hash, coinbase_target, proof_target, &mut rand::rng())
236 })
237 .await;
238
239 if let Ok(Some((solution_target, solution))) = result {
241 info!("Found a Solution '{}' (Proof Target {solution_target})", solution.id());
242 self.broadcast_solution(solution);
244 }
245 } else {
246 tokio::time::sleep(Duration::from_secs(1)).await;
248 }
249
250 if self.signal_handler.is_stopped() {
252 debug!("Shutting down the puzzle...");
253 break;
254 }
255 }
256 }
257
258 fn puzzle_iteration<R: Rng + CryptoRng>(
260 &self,
261 epoch_hash: N::BlockHash,
262 coinbase_target: u64,
263 proof_target: u64,
264 rng: &mut R,
265 ) -> Option<(u64, Solution<N>)> {
266 self.increment_puzzle_instances();
268
269 debug!(
270 "Proving 'Puzzle' for Epoch '{}' {}",
271 fmt_id(epoch_hash),
272 format!("(Coinbase Target {coinbase_target}, Proof Target {proof_target})").dimmed()
273 );
274
275 let result =
277 self.puzzle.prove(epoch_hash, self.address(), rng.random(), Some(proof_target)).ok().and_then(|solution| {
278 self.puzzle.get_proof_target(&solution).ok().map(|solution_target| (solution_target, solution))
279 });
280
281 self.decrement_puzzle_instances();
283 result
285 }
286
287 fn broadcast_solution(&self, solution: Solution<N>) {
289 let message = Message::UnconfirmedSolution(UnconfirmedSolution {
291 solution_id: solution.id(),
292 solution: Data::Object(solution),
293 });
294 self.propagate(message, &[]);
296 }
297
298 fn num_puzzle_instances(&self) -> u8 {
300 self.puzzle_instances.load(Ordering::Relaxed)
301 }
302
303 fn increment_puzzle_instances(&self) {
305 self.puzzle_instances.fetch_add(1, Ordering::Relaxed);
306 #[cfg(debug_assertions)]
307 trace!("Number of Instances - {}", self.num_puzzle_instances());
308 }
309
310 fn decrement_puzzle_instances(&self) {
312 self.puzzle_instances.fetch_sub(1, Ordering::Relaxed);
313 #[cfg(debug_assertions)]
314 trace!("Number of Instances - {}", self.num_puzzle_instances());
315 }
316}