Skip to main content

snarkos_node/prover/
mod.rs

1// Copyright (c) 2019-2025 Provable Inc.
2// This file is part of the snarkOS library.
3
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at:
7
8// http://www.apache.org/licenses/LICENSE-2.0
9
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16mod router;
17
18use crate::{
19    bft::ledger_service::ProverLedgerService,
20    sync::{BlockSync, Ping},
21    traits::NodeInterface,
22};
23
24use snarkos_account::Account;
25use snarkos_node_network::{NodeType, PeerPoolHandling};
26use snarkos_node_router::{
27    Heartbeat,
28    Inbound,
29    Outbound,
30    Router,
31    Routing,
32    messages::{Message, UnconfirmedSolution},
33};
34use snarkos_node_tcp::{
35    P2P,
36    protocols::{Disconnect, Handshake, OnConnect, Reading},
37};
38use snarkos_utilities::{NodeDataDir, SignalHandler, Stoppable};
39
40use snarkvm::{
41    ledger::narwhal::Data,
42    prelude::{
43        Network,
44        block::{Block, Header},
45        puzzle::{Puzzle, Solution},
46        store::ConsensusStorage,
47    },
48    synthesizer::VM,
49};
50
51use anyhow::Result;
52use colored::Colorize;
53use core::{marker::PhantomData, time::Duration};
54#[cfg(feature = "locktick")]
55use locktick::parking_lot::{Mutex, RwLock};
56#[cfg(not(feature = "locktick"))]
57use parking_lot::{Mutex, RwLock};
58use rand::{CryptoRng, Rng, rngs::OsRng};
59use snarkos_node_bft::helpers::fmt_id;
60use std::{
61    net::SocketAddr,
62    sync::{
63        Arc,
64        atomic::{AtomicU8, Ordering},
65    },
66};
67use tokio::task::JoinHandle;
68
69/// A prover is a light node, capable of producing proofs for consensus.
70#[derive(Clone)]
71pub struct Prover<N: Network, C: ConsensusStorage<N>> {
72    /// The router of the node.
73    router: Router<N>,
74    /// The sync module.
75    sync: Arc<BlockSync<N>>,
76    /// The genesis block.
77    genesis: Block<N>,
78    /// The puzzle.
79    puzzle: Puzzle<N>,
80    /// The latest epoch hash.
81    latest_epoch_hash: Arc<RwLock<Option<N::BlockHash>>>,
82    /// The latest block header.
83    latest_block_header: Arc<RwLock<Option<Header<N>>>>,
84    /// The number of puzzle instances.
85    puzzle_instances: Arc<AtomicU8>,
86    /// The maximum number of puzzle instances.
87    max_puzzle_instances: u8,
88    /// The spawned handles.
89    handles: Arc<Mutex<Vec<JoinHandle<()>>>>,
90    /// Keeps track of sending pings.
91    ping: Arc<Ping<N>>,
92    /// The signal handling logic.
93    signal_handler: Arc<SignalHandler>,
94    /// PhantomData.
95    _phantom: PhantomData<C>,
96}
97
98impl<N: Network, C: ConsensusStorage<N>> Prover<N, C> {
99    /// Initializes a new prover node.
100    pub async fn new(
101        node_ip: SocketAddr,
102        account: Account<N>,
103        trusted_peers: &[SocketAddr],
104        genesis: Block<N>,
105        node_data_dir: NodeDataDir,
106        trusted_peers_only: bool,
107        dev: Option<u16>,
108        signal_handler: Arc<SignalHandler>,
109    ) -> Result<Self> {
110        // Initialize the ledger service.
111        let ledger_service = Arc::new(ProverLedgerService::new());
112
113        // Initialize the node router.
114        let router = Router::new(
115            node_ip,
116            NodeType::Prover,
117            account,
118            ledger_service.clone(),
119            trusted_peers,
120            Self::MAXIMUM_NUMBER_OF_PEERS as u16,
121            trusted_peers_only,
122            node_data_dir,
123            dev.is_some(),
124        )
125        .await?;
126
127        // Initialize the sync module.
128        let sync = BlockSync::new(ledger_service.clone());
129
130        // Set up the ping logic.
131        let ping = Arc::new(Ping::new_nosync(router.clone()));
132
133        // Compute the maximum number of puzzle instances.
134        let max_puzzle_instances = num_cpus::get().saturating_sub(2).clamp(1, 6);
135        // Initialize the node.
136        let node = Self {
137            router,
138            sync: Arc::new(sync),
139            genesis,
140            puzzle: VM::<N, C>::new_puzzle()?,
141            latest_epoch_hash: Default::default(),
142            latest_block_header: Default::default(),
143            puzzle_instances: Default::default(),
144            max_puzzle_instances: u8::try_from(max_puzzle_instances)?,
145            handles: Default::default(),
146            ping,
147            signal_handler,
148            _phantom: Default::default(),
149        };
150        // Initialize the routing.
151        node.initialize_routing().await;
152        // Initialize the puzzle.
153        node.initialize_puzzle().await;
154        // Initialize the notification message loop.
155        node.handles.lock().push(crate::start_notification_message_loop());
156
157        // Return the node.
158        Ok(node)
159    }
160
161    pub fn router(&self) -> &Router<N> {
162        &self.router
163    }
164}
165
166#[async_trait]
167impl<N: Network, C: ConsensusStorage<N>> NodeInterface<N> for Prover<N, C> {
168    /// Shuts down the node.
169    async fn shut_down(&self) {
170        info!("Shutting down...");
171
172        // Shut down the puzzle.
173        debug!("Shutting down the puzzle...");
174
175        // Abort the tasks.
176        debug!("Shutting down the prover...");
177        self.handles.lock().iter().for_each(|handle| handle.abort());
178
179        // Shut down the router.
180        self.router.shut_down().await;
181
182        info!("Node has shut down.");
183    }
184}
185
186impl<N: Network, C: ConsensusStorage<N>> Prover<N, C> {
187    /// Initialize a new instance of the puzzle.
188    async fn initialize_puzzle(&self) {
189        for _ in 0..self.max_puzzle_instances {
190            let prover = self.clone();
191            self.handles.lock().push(tokio::spawn(async move {
192                prover.puzzle_loop().await;
193            }));
194        }
195    }
196
197    /// Executes an instance of the puzzle.
198    async fn puzzle_loop(&self) {
199        loop {
200            // If the node is not connected to any peers, then skip this iteration.
201            if self.router.number_of_connected_peers() == 0 {
202                debug!("Skipping an iteration of the puzzle (no connected peers)");
203                tokio::time::sleep(Duration::from_secs(N::ANCHOR_TIME as u64)).await;
204                continue;
205            }
206
207            // If the number of instances of the puzzle exceeds the maximum, then skip this iteration.
208            if self.num_puzzle_instances() > self.max_puzzle_instances {
209                // Sleep for a brief period of time.
210                tokio::time::sleep(Duration::from_millis(500)).await;
211                continue;
212            }
213
214            // Read the latest epoch hash.
215            let latest_epoch_hash = *self.latest_epoch_hash.read();
216            // Read the latest state.
217            let latest_state = self
218                .latest_block_header
219                .read()
220                .as_ref()
221                .map(|header| (header.coinbase_target(), header.proof_target()));
222
223            // If the latest epoch hash and latest state exists, then proceed to generate a solution.
224            if let (Some(epoch_hash), Some((coinbase_target, proof_target))) = (latest_epoch_hash, latest_state) {
225                // Execute the puzzle.
226                let prover = self.clone();
227                let result = tokio::task::spawn_blocking(move || {
228                    prover.puzzle_iteration(epoch_hash, coinbase_target, proof_target, &mut OsRng)
229                })
230                .await;
231
232                // If the prover found a solution, then broadcast it.
233                if let Ok(Some((solution_target, solution))) = result {
234                    info!("Found a Solution '{}' (Proof Target {solution_target})", solution.id());
235                    // Broadcast the solution.
236                    self.broadcast_solution(solution);
237                }
238            } else {
239                // Otherwise, sleep for a brief period of time, to await for puzzle state.
240                tokio::time::sleep(Duration::from_secs(1)).await;
241            }
242
243            // If the Ctrl-C handler registered the signal, stop the prover.
244            if self.signal_handler.is_stopped() {
245                debug!("Shutting down the puzzle...");
246                break;
247            }
248        }
249    }
250
251    /// Performs one iteration of the puzzle.
252    fn puzzle_iteration<R: Rng + CryptoRng>(
253        &self,
254        epoch_hash: N::BlockHash,
255        coinbase_target: u64,
256        proof_target: u64,
257        rng: &mut R,
258    ) -> Option<(u64, Solution<N>)> {
259        // Increment the puzzle instances.
260        self.increment_puzzle_instances();
261
262        debug!(
263            "Proving 'Puzzle' for Epoch '{}' {}",
264            fmt_id(epoch_hash),
265            format!("(Coinbase Target {coinbase_target}, Proof Target {proof_target})").dimmed()
266        );
267
268        // Compute the solution.
269        let result =
270            self.puzzle.prove(epoch_hash, self.address(), rng.r#gen(), Some(proof_target)).ok().and_then(|solution| {
271                self.puzzle.get_proof_target(&solution).ok().map(|solution_target| (solution_target, solution))
272            });
273
274        // Decrement the puzzle instances.
275        self.decrement_puzzle_instances();
276        // Return the result.
277        result
278    }
279
280    /// Broadcasts the solution to the network.
281    fn broadcast_solution(&self, solution: Solution<N>) {
282        // Prepare the unconfirmed solution message.
283        let message = Message::UnconfirmedSolution(UnconfirmedSolution {
284            solution_id: solution.id(),
285            solution: Data::Object(solution),
286        });
287        // Propagate the "UnconfirmedSolution".
288        self.propagate(message, &[]);
289    }
290
291    /// Returns the current number of puzzle instances.
292    fn num_puzzle_instances(&self) -> u8 {
293        self.puzzle_instances.load(Ordering::Relaxed)
294    }
295
296    /// Increments the number of puzzle instances.
297    fn increment_puzzle_instances(&self) {
298        self.puzzle_instances.fetch_add(1, Ordering::Relaxed);
299        #[cfg(debug_assertions)]
300        trace!("Number of Instances - {}", self.num_puzzle_instances());
301    }
302
303    /// Decrements the number of puzzle instances.
304    fn decrement_puzzle_instances(&self) {
305        self.puzzle_instances.fetch_sub(1, Ordering::Relaxed);
306        #[cfg(debug_assertions)]
307        trace!("Number of Instances - {}", self.num_puzzle_instances());
308    }
309}