Skip to main content

snarkos_node/prover/
mod.rs

1// Copyright (c) 2019-2026 Provable Inc.
2// This file is part of the snarkOS library.
3
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at:
7
8// http://www.apache.org/licenses/LICENSE-2.0
9
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16mod router;
17
18use crate::{
19    bft::ledger_service::ProverLedgerService,
20    sync::{BlockSync, Ping},
21    traits::NodeInterface,
22};
23
24use snarkos_account::Account;
25use snarkos_node_network::{ConnectionMode, NodeType, PeerPoolHandling};
26use snarkos_node_router::{
27    Heartbeat,
28    Inbound,
29    Outbound,
30    Router,
31    Routing,
32    messages::{Message, UnconfirmedSolution},
33};
34use snarkos_node_tcp::{
35    P2P,
36    protocols::{Disconnect, Handshake, OnConnect, Reading},
37};
38use snarkos_utilities::{NodeDataDir, SignalHandler, Stoppable};
39
40use snarkvm::{
41    console::network::consensus_config_value,
42    ledger::narwhal::Data,
43    prelude::{
44        Network,
45        block::{Block, Header},
46        puzzle::{Puzzle, Solution},
47        store::ConsensusStorage,
48    },
49    synthesizer::VM,
50};
51
52use anyhow::Result;
53use colored::Colorize;
54use core::{marker::PhantomData, time::Duration};
55#[cfg(feature = "locktick")]
56use locktick::parking_lot::{Mutex, RwLock};
57#[cfg(not(feature = "locktick"))]
58use parking_lot::{Mutex, RwLock};
59use rand::{CryptoRng, Rng, RngExt};
60use snarkos_node_bft::helpers::fmt_id;
61use std::{
62    net::SocketAddr,
63    sync::{
64        Arc,
65        atomic::{AtomicU8, Ordering},
66    },
67};
68use tokio::task::JoinHandle;
69
70/// A prover is a light node, capable of producing proofs for consensus.
71#[derive(Clone)]
72pub struct Prover<N: Network, C: ConsensusStorage<N>> {
73    /// The router of the node.
74    router: Router<N>,
75    /// The sync module.
76    sync: Arc<BlockSync<N>>,
77    /// The genesis block.
78    genesis: Block<N>,
79    /// The puzzle.
80    puzzle: Puzzle<N>,
81    /// The latest epoch hash.
82    latest_epoch_hash: Arc<RwLock<Option<N::BlockHash>>>,
83    /// The latest block header.
84    latest_block_header: Arc<RwLock<Option<Header<N>>>>,
85    /// The number of puzzle instances.
86    puzzle_instances: Arc<AtomicU8>,
87    /// The maximum number of puzzle instances.
88    max_puzzle_instances: u8,
89    /// The spawned handles.
90    handles: Arc<Mutex<Vec<JoinHandle<()>>>>,
91    /// Keeps track of sending pings.
92    ping: Arc<Ping<N>>,
93    /// The signal handling logic.
94    signal_handler: Arc<SignalHandler>,
95    /// PhantomData.
96    _phantom: PhantomData<C>,
97}
98
99impl<N: Network, C: ConsensusStorage<N>> Prover<N, C> {
100    /// Initializes a new prover node.
101    pub async fn new(
102        node_ip: SocketAddr,
103        account: Account<N>,
104        trusted_peers: &[SocketAddr],
105        genesis: Block<N>,
106        node_data_dir: NodeDataDir,
107        trusted_peers_only: bool,
108        dev: Option<u16>,
109        signal_handler: Arc<SignalHandler>,
110    ) -> Result<Self> {
111        // Initialize the ledger service.
112        let ledger_service = Arc::new(ProverLedgerService::new());
113
114        // Initialize the node router.
115        let router = Router::new(
116            node_ip,
117            NodeType::Prover,
118            account,
119            ledger_service.clone(),
120            trusted_peers,
121            Self::MAXIMUM_NUMBER_OF_PEERS as u16,
122            trusted_peers_only,
123            node_data_dir,
124            dev.is_some(),
125        )
126        .await?;
127
128        // Initialize the sync module.
129        let sync = BlockSync::new(ledger_service.clone(), ConnectionMode::Router);
130
131        // Set up the ping logic.
132        let ping = Arc::new(Ping::new_nosync(router.clone()));
133
134        // Compute the maximum number of puzzle instances.
135        let max_puzzle_instances = num_cpus::get().saturating_sub(2).clamp(1, 6);
136        // Initialize the node.
137        let node = Self {
138            router,
139            sync: Arc::new(sync),
140            genesis,
141            puzzle: VM::<N, C>::new_puzzle()?,
142            latest_epoch_hash: Default::default(),
143            latest_block_header: Default::default(),
144            puzzle_instances: Default::default(),
145            max_puzzle_instances: u8::try_from(max_puzzle_instances)?,
146            handles: Default::default(),
147            ping,
148            signal_handler,
149            _phantom: Default::default(),
150        };
151        // Initialize the routing.
152        node.initialize_routing().await;
153        // Initialize the puzzle.
154        node.initialize_puzzle().await;
155        // Initialize the notification message loop.
156        node.handles.lock().push(crate::start_notification_message_loop());
157
158        // Return the node.
159        Ok(node)
160    }
161
162    pub fn router(&self) -> &Router<N> {
163        &self.router
164    }
165}
166
167#[async_trait]
168impl<N: Network, C: ConsensusStorage<N>> NodeInterface<N> for Prover<N, C> {
169    /// Shuts down the node.
170    async fn shut_down(&self) {
171        info!("Shutting down...");
172
173        // Shut down the puzzle.
174        debug!("Shutting down the puzzle...");
175
176        // Abort the tasks.
177        debug!("Shutting down the prover...");
178        self.handles.lock().iter().for_each(|handle| handle.abort());
179
180        // Shut down the router.
181        self.router.shut_down().await;
182
183        info!("Node has shut down.");
184    }
185}
186
187impl<N: Network, C: ConsensusStorage<N>> Prover<N, C> {
188    /// Initialize a new instance of the puzzle.
189    async fn initialize_puzzle(&self) {
190        for _ in 0..self.max_puzzle_instances {
191            let prover = self.clone();
192            self.handles.lock().push(tokio::spawn(async move {
193                prover.puzzle_loop().await;
194            }));
195        }
196    }
197
198    /// Executes an instance of the puzzle.
199    async fn puzzle_loop(&self) {
200        loop {
201            // If the node is not connected to any peers, then skip this iteration.
202            if self.router.number_of_connected_peers() == 0 {
203                debug!("Skipping an iteration of the puzzle (no connected peers)");
204                let anchor_time = self
205                    .latest_block_header
206                    .read()
207                    .as_ref()
208                    .and_then(|header| consensus_config_value!(N, ANCHOR_TIMES, header.height()))
209                    .unwrap_or_else(|| N::ANCHOR_TIMES.last().unwrap().1);
210                tokio::time::sleep(Duration::from_secs(anchor_time as u64)).await;
211                continue;
212            }
213
214            // If the number of instances of the puzzle exceeds the maximum, then skip this iteration.
215            if self.num_puzzle_instances() > self.max_puzzle_instances {
216                // Sleep for a brief period of time.
217                tokio::time::sleep(Duration::from_millis(500)).await;
218                continue;
219            }
220
221            // Read the latest epoch hash.
222            let latest_epoch_hash = *self.latest_epoch_hash.read();
223            // Read the latest state.
224            let latest_state = self
225                .latest_block_header
226                .read()
227                .as_ref()
228                .map(|header| (header.coinbase_target(), header.proof_target()));
229
230            // If the latest epoch hash and latest state exists, then proceed to generate a solution.
231            if let (Some(epoch_hash), Some((coinbase_target, proof_target))) = (latest_epoch_hash, latest_state) {
232                // Execute the puzzle.
233                let prover = self.clone();
234                let result = tokio::task::spawn_blocking(move || {
235                    prover.puzzle_iteration(epoch_hash, coinbase_target, proof_target, &mut rand::rng())
236                })
237                .await;
238
239                // If the prover found a solution, then broadcast it.
240                if let Ok(Some((solution_target, solution))) = result {
241                    info!("Found a Solution '{}' (Proof Target {solution_target})", solution.id());
242                    // Broadcast the solution.
243                    self.broadcast_solution(solution);
244                }
245            } else {
246                // Otherwise, sleep for a brief period of time, to await for puzzle state.
247                tokio::time::sleep(Duration::from_secs(1)).await;
248            }
249
250            // If the Ctrl-C handler registered the signal, stop the prover.
251            if self.signal_handler.is_stopped() {
252                debug!("Shutting down the puzzle...");
253                break;
254            }
255        }
256    }
257
258    /// Performs one iteration of the puzzle.
259    fn puzzle_iteration<R: Rng + CryptoRng>(
260        &self,
261        epoch_hash: N::BlockHash,
262        coinbase_target: u64,
263        proof_target: u64,
264        rng: &mut R,
265    ) -> Option<(u64, Solution<N>)> {
266        // Increment the puzzle instances.
267        self.increment_puzzle_instances();
268
269        debug!(
270            "Proving 'Puzzle' for Epoch '{}' {}",
271            fmt_id(epoch_hash),
272            format!("(Coinbase Target {coinbase_target}, Proof Target {proof_target})").dimmed()
273        );
274
275        // Compute the solution.
276        let result =
277            self.puzzle.prove(epoch_hash, self.address(), rng.random(), Some(proof_target)).ok().and_then(|solution| {
278                self.puzzle.get_proof_target(&solution).ok().map(|solution_target| (solution_target, solution))
279            });
280
281        // Decrement the puzzle instances.
282        self.decrement_puzzle_instances();
283        // Return the result.
284        result
285    }
286
287    /// Broadcasts the solution to the network.
288    fn broadcast_solution(&self, solution: Solution<N>) {
289        // Prepare the unconfirmed solution message.
290        let message = Message::UnconfirmedSolution(UnconfirmedSolution {
291            solution_id: solution.id(),
292            solution: Data::Object(solution),
293        });
294        // Propagate the "UnconfirmedSolution".
295        self.propagate(message, &[]);
296    }
297
298    /// Returns the current number of puzzle instances.
299    fn num_puzzle_instances(&self) -> u8 {
300        self.puzzle_instances.load(Ordering::Relaxed)
301    }
302
303    /// Increments the number of puzzle instances.
304    fn increment_puzzle_instances(&self) {
305        self.puzzle_instances.fetch_add(1, Ordering::Relaxed);
306        #[cfg(debug_assertions)]
307        trace!("Number of Instances - {}", self.num_puzzle_instances());
308    }
309
310    /// Decrements the number of puzzle instances.
311    fn decrement_puzzle_instances(&self) {
312        self.puzzle_instances.fetch_sub(1, Ordering::Relaxed);
313        #[cfg(debug_assertions)]
314        trace!("Number of Instances - {}", self.num_puzzle_instances());
315    }
316}