snarkos_node/prover/
mod.rs

1// Copyright (c) 2019-2025 Provable Inc.
2// This file is part of the snarkOS library.
3
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at:
7
8// http://www.apache.org/licenses/LICENSE-2.0
9
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16mod router;
17
18use crate::traits::NodeInterface;
19use snarkos_account::Account;
20use snarkos_node_bft::ledger_service::ProverLedgerService;
21use snarkos_node_router::{
22    Heartbeat,
23    Inbound,
24    Outbound,
25    Router,
26    Routing,
27    messages::{Message, NodeType, UnconfirmedSolution},
28};
29use snarkos_node_sync::{BlockSync, Ping};
30use snarkos_node_tcp::{
31    P2P,
32    protocols::{Disconnect, Handshake, OnConnect, Reading},
33};
34use snarkvm::{
35    ledger::narwhal::Data,
36    prelude::{
37        Network,
38        block::{Block, Header},
39        puzzle::{Puzzle, Solution},
40        store::ConsensusStorage,
41    },
42    synthesizer::VM,
43};
44
45use anyhow::Result;
46use colored::Colorize;
47use core::{marker::PhantomData, time::Duration};
48#[cfg(feature = "locktick")]
49use locktick::parking_lot::{Mutex, RwLock};
50#[cfg(not(feature = "locktick"))]
51use parking_lot::{Mutex, RwLock};
52use rand::{CryptoRng, Rng, rngs::OsRng};
53use snarkos_node_bft::helpers::fmt_id;
54use std::{
55    net::SocketAddr,
56    sync::{
57        Arc,
58        atomic::{AtomicBool, AtomicU8, Ordering},
59    },
60};
61use tokio::task::JoinHandle;
62
63/// A prover is a light node, capable of producing proofs for consensus.
64#[derive(Clone)]
65pub struct Prover<N: Network, C: ConsensusStorage<N>> {
66    /// The router of the node.
67    router: Router<N>,
68    /// The sync module.
69    sync: Arc<BlockSync<N>>,
70    /// The genesis block.
71    genesis: Block<N>,
72    /// The puzzle.
73    puzzle: Puzzle<N>,
74    /// The latest epoch hash.
75    latest_epoch_hash: Arc<RwLock<Option<N::BlockHash>>>,
76    /// The latest block header.
77    latest_block_header: Arc<RwLock<Option<Header<N>>>>,
78    /// The number of puzzle instances.
79    puzzle_instances: Arc<AtomicU8>,
80    /// The maximum number of puzzle instances.
81    max_puzzle_instances: u8,
82    /// The spawned handles.
83    handles: Arc<Mutex<Vec<JoinHandle<()>>>>,
84    /// The shutdown signal.
85    shutdown: Arc<AtomicBool>,
86    /// Keeps track of sending pings.
87    ping: Arc<Ping<N>>,
88    /// PhantomData.
89    _phantom: PhantomData<C>,
90}
91
92impl<N: Network, C: ConsensusStorage<N>> Prover<N, C> {
93    /// Initializes a new prover node.
94    pub async fn new(
95        node_ip: SocketAddr,
96        account: Account<N>,
97        trusted_peers: &[SocketAddr],
98        genesis: Block<N>,
99        dev: Option<u16>,
100        shutdown: Arc<AtomicBool>,
101    ) -> Result<Self> {
102        // Initialize the signal handler.
103        let signal_node = Self::handle_signals(shutdown.clone());
104
105        // Initialize the ledger service.
106        let ledger_service = Arc::new(ProverLedgerService::new());
107        // Determine if the prover should allow external peers.
108        let allow_external_peers = true;
109        // Determine if the prover should rotate external peers.
110        let rotate_external_peers = false;
111
112        // Initialize the node router.
113        let router = Router::new(
114            node_ip,
115            NodeType::Prover,
116            account,
117            ledger_service.clone(),
118            trusted_peers,
119            Self::MAXIMUM_NUMBER_OF_PEERS as u16,
120            rotate_external_peers,
121            allow_external_peers,
122            dev.is_some(),
123        )
124        .await?;
125
126        // Initialize the sync module.
127        let sync = BlockSync::new(ledger_service.clone());
128
129        // Set up the ping logic.
130        let ping = Arc::new(Ping::new_nosync(router.clone()));
131
132        // Compute the maximum number of puzzle instances.
133        let max_puzzle_instances = num_cpus::get().saturating_sub(2).clamp(1, 6);
134        // Initialize the node.
135        let node = Self {
136            router,
137            sync: Arc::new(sync),
138            genesis,
139            puzzle: VM::<N, C>::new_puzzle()?,
140            latest_epoch_hash: Default::default(),
141            latest_block_header: Default::default(),
142            puzzle_instances: Default::default(),
143            max_puzzle_instances: u8::try_from(max_puzzle_instances)?,
144            handles: Default::default(),
145            ping,
146            shutdown,
147            _phantom: Default::default(),
148        };
149        // Initialize the routing.
150        node.initialize_routing().await;
151        // Initialize the puzzle.
152        node.initialize_puzzle().await;
153        // Initialize the notification message loop.
154        node.handles.lock().push(crate::start_notification_message_loop());
155        // Pass the node to the signal handler.
156        let _ = signal_node.set(node.clone());
157        // Return the node.
158        Ok(node)
159    }
160
161    pub fn router(&self) -> &Router<N> {
162        &self.router
163    }
164}
165
166#[async_trait]
167impl<N: Network, C: ConsensusStorage<N>> NodeInterface<N> for Prover<N, C> {
168    /// Shuts down the node.
169    async fn shut_down(&self) {
170        info!("Shutting down...");
171
172        // Shut down the puzzle.
173        debug!("Shutting down the puzzle...");
174        self.shutdown.store(true, Ordering::Release);
175
176        // Abort the tasks.
177        debug!("Shutting down the prover...");
178        self.handles.lock().iter().for_each(|handle| handle.abort());
179
180        // Shut down the router.
181        self.router.shut_down().await;
182
183        info!("Node has shut down.");
184    }
185}
186
187impl<N: Network, C: ConsensusStorage<N>> Prover<N, C> {
188    /// Initialize a new instance of the puzzle.
189    async fn initialize_puzzle(&self) {
190        for _ in 0..self.max_puzzle_instances {
191            let prover = self.clone();
192            self.handles.lock().push(tokio::spawn(async move {
193                prover.puzzle_loop().await;
194            }));
195        }
196    }
197
198    /// Executes an instance of the puzzle.
199    async fn puzzle_loop(&self) {
200        loop {
201            // If the node is not connected to any peers, then skip this iteration.
202            if self.router.number_of_connected_peers() == 0 {
203                debug!("Skipping an iteration of the puzzle (no connected peers)");
204                tokio::time::sleep(Duration::from_secs(N::ANCHOR_TIME as u64)).await;
205                continue;
206            }
207
208            // If the number of instances of the puzzle exceeds the maximum, then skip this iteration.
209            if self.num_puzzle_instances() > self.max_puzzle_instances {
210                // Sleep for a brief period of time.
211                tokio::time::sleep(Duration::from_millis(500)).await;
212                continue;
213            }
214
215            // Read the latest epoch hash.
216            let latest_epoch_hash = *self.latest_epoch_hash.read();
217            // Read the latest state.
218            let latest_state = self
219                .latest_block_header
220                .read()
221                .as_ref()
222                .map(|header| (header.coinbase_target(), header.proof_target()));
223
224            // If the latest epoch hash and latest state exists, then proceed to generate a solution.
225            if let (Some(epoch_hash), Some((coinbase_target, proof_target))) = (latest_epoch_hash, latest_state) {
226                // Execute the puzzle.
227                let prover = self.clone();
228                let result = tokio::task::spawn_blocking(move || {
229                    prover.puzzle_iteration(epoch_hash, coinbase_target, proof_target, &mut OsRng)
230                })
231                .await;
232
233                // If the prover found a solution, then broadcast it.
234                if let Ok(Some((solution_target, solution))) = result {
235                    info!("Found a Solution '{}' (Proof Target {solution_target})", solution.id());
236                    // Broadcast the solution.
237                    self.broadcast_solution(solution);
238                }
239            } else {
240                // Otherwise, sleep for a brief period of time, to await for puzzle state.
241                tokio::time::sleep(Duration::from_secs(1)).await;
242            }
243
244            // If the Ctrl-C handler registered the signal, stop the prover.
245            if self.shutdown.load(Ordering::Acquire) {
246                debug!("Shutting down the puzzle...");
247                break;
248            }
249        }
250    }
251
252    /// Performs one iteration of the puzzle.
253    fn puzzle_iteration<R: Rng + CryptoRng>(
254        &self,
255        epoch_hash: N::BlockHash,
256        coinbase_target: u64,
257        proof_target: u64,
258        rng: &mut R,
259    ) -> Option<(u64, Solution<N>)> {
260        // Increment the puzzle instances.
261        self.increment_puzzle_instances();
262
263        debug!(
264            "Proving 'Puzzle' for Epoch '{}' {}",
265            fmt_id(epoch_hash),
266            format!("(Coinbase Target {coinbase_target}, Proof Target {proof_target})").dimmed()
267        );
268
269        // Compute the solution.
270        let result =
271            self.puzzle.prove(epoch_hash, self.address(), rng.r#gen(), Some(proof_target)).ok().and_then(|solution| {
272                self.puzzle.get_proof_target(&solution).ok().map(|solution_target| (solution_target, solution))
273            });
274
275        // Decrement the puzzle instances.
276        self.decrement_puzzle_instances();
277        // Return the result.
278        result
279    }
280
281    /// Broadcasts the solution to the network.
282    fn broadcast_solution(&self, solution: Solution<N>) {
283        // Prepare the unconfirmed solution message.
284        let message = Message::UnconfirmedSolution(UnconfirmedSolution {
285            solution_id: solution.id(),
286            solution: Data::Object(solution),
287        });
288        // Propagate the "UnconfirmedSolution".
289        self.propagate(message, &[]);
290    }
291
292    /// Returns the current number of puzzle instances.
293    fn num_puzzle_instances(&self) -> u8 {
294        self.puzzle_instances.load(Ordering::Relaxed)
295    }
296
297    /// Increments the number of puzzle instances.
298    fn increment_puzzle_instances(&self) {
299        self.puzzle_instances.fetch_add(1, Ordering::Relaxed);
300        #[cfg(debug_assertions)]
301        trace!("Number of Instances - {}", self.num_puzzle_instances());
302    }
303
304    /// Decrements the number of puzzle instances.
305    fn decrement_puzzle_instances(&self) {
306        self.puzzle_instances.fetch_sub(1, Ordering::Relaxed);
307        #[cfg(debug_assertions)]
308        trace!("Number of Instances - {}", self.num_puzzle_instances());
309    }
310}