snarkos_node/prover/
mod.rs

1// Copyright (c) 2019-2025 Provable Inc.
2// This file is part of the snarkOS library.
3
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at:
7
8// http://www.apache.org/licenses/LICENSE-2.0
9
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16mod router;
17
18use crate::traits::NodeInterface;
19use snarkos_account::Account;
20use snarkos_node_bft::ledger_service::ProverLedgerService;
21use snarkos_node_router::{
22    Heartbeat,
23    Inbound,
24    Outbound,
25    PeerPoolHandling,
26    Router,
27    Routing,
28    messages::{Message, NodeType, UnconfirmedSolution},
29};
30use snarkos_node_sync::{BlockSync, Ping};
31use snarkos_node_tcp::{
32    P2P,
33    protocols::{Disconnect, Handshake, OnConnect, Reading},
34};
35use snarkvm::{
36    ledger::narwhal::Data,
37    prelude::{
38        Network,
39        block::{Block, Header},
40        puzzle::{Puzzle, Solution},
41        store::ConsensusStorage,
42    },
43    synthesizer::VM,
44};
45
46use aleo_std::StorageMode;
47use anyhow::Result;
48use colored::Colorize;
49use core::{marker::PhantomData, time::Duration};
50#[cfg(feature = "locktick")]
51use locktick::parking_lot::{Mutex, RwLock};
52#[cfg(not(feature = "locktick"))]
53use parking_lot::{Mutex, RwLock};
54use rand::{CryptoRng, Rng, rngs::OsRng};
55use snarkos_node_bft::helpers::fmt_id;
56use std::{
57    net::SocketAddr,
58    sync::{
59        Arc,
60        atomic::{AtomicBool, AtomicU8, Ordering},
61    },
62};
63use tokio::task::JoinHandle;
64
65/// A prover is a light node, capable of producing proofs for consensus.
66#[derive(Clone)]
67pub struct Prover<N: Network, C: ConsensusStorage<N>> {
68    /// The router of the node.
69    router: Router<N>,
70    /// The sync module.
71    sync: Arc<BlockSync<N>>,
72    /// The genesis block.
73    genesis: Block<N>,
74    /// The puzzle.
75    puzzle: Puzzle<N>,
76    /// The latest epoch hash.
77    latest_epoch_hash: Arc<RwLock<Option<N::BlockHash>>>,
78    /// The latest block header.
79    latest_block_header: Arc<RwLock<Option<Header<N>>>>,
80    /// The number of puzzle instances.
81    puzzle_instances: Arc<AtomicU8>,
82    /// The maximum number of puzzle instances.
83    max_puzzle_instances: u8,
84    /// The spawned handles.
85    handles: Arc<Mutex<Vec<JoinHandle<()>>>>,
86    /// The shutdown signal.
87    shutdown: Arc<AtomicBool>,
88    /// Keeps track of sending pings.
89    ping: Arc<Ping<N>>,
90    /// PhantomData.
91    _phantom: PhantomData<C>,
92}
93
94impl<N: Network, C: ConsensusStorage<N>> Prover<N, C> {
95    /// Initializes a new prover node.
96    pub async fn new(
97        node_ip: SocketAddr,
98        account: Account<N>,
99        trusted_peers: &[SocketAddr],
100        genesis: Block<N>,
101        storage_mode: StorageMode,
102        dev: Option<u16>,
103        shutdown: Arc<AtomicBool>,
104    ) -> Result<Self> {
105        // Initialize the signal handler.
106        let signal_node = Self::handle_signals(shutdown.clone());
107
108        // Initialize the ledger service.
109        let ledger_service = Arc::new(ProverLedgerService::new());
110        // Determine if the prover should allow external peers.
111        let allow_external_peers = true;
112        // Determine if the prover should rotate external peers.
113        let rotate_external_peers = false;
114
115        // Initialize the node router.
116        let router = Router::new(
117            node_ip,
118            NodeType::Prover,
119            account,
120            ledger_service.clone(),
121            trusted_peers,
122            Self::MAXIMUM_NUMBER_OF_PEERS as u16,
123            rotate_external_peers,
124            allow_external_peers,
125            storage_mode,
126            dev.is_some(),
127        )
128        .await?;
129
130        // Initialize the sync module.
131        let sync = BlockSync::new(ledger_service.clone());
132
133        // Set up the ping logic.
134        let ping = Arc::new(Ping::new_nosync(router.clone()));
135
136        // Compute the maximum number of puzzle instances.
137        let max_puzzle_instances = num_cpus::get().saturating_sub(2).clamp(1, 6);
138        // Initialize the node.
139        let node = Self {
140            router,
141            sync: Arc::new(sync),
142            genesis,
143            puzzle: VM::<N, C>::new_puzzle()?,
144            latest_epoch_hash: Default::default(),
145            latest_block_header: Default::default(),
146            puzzle_instances: Default::default(),
147            max_puzzle_instances: u8::try_from(max_puzzle_instances)?,
148            handles: Default::default(),
149            ping,
150            shutdown,
151            _phantom: Default::default(),
152        };
153        // Initialize the routing.
154        node.initialize_routing().await;
155        // Initialize the puzzle.
156        node.initialize_puzzle().await;
157        // Initialize the notification message loop.
158        node.handles.lock().push(crate::start_notification_message_loop());
159        // Pass the node to the signal handler.
160        let _ = signal_node.set(node.clone());
161        // Return the node.
162        Ok(node)
163    }
164
165    pub fn router(&self) -> &Router<N> {
166        &self.router
167    }
168}
169
170#[async_trait]
171impl<N: Network, C: ConsensusStorage<N>> NodeInterface<N> for Prover<N, C> {
172    /// Shuts down the node.
173    async fn shut_down(&self) {
174        info!("Shutting down...");
175
176        // Shut down the puzzle.
177        debug!("Shutting down the puzzle...");
178        self.shutdown.store(true, Ordering::Release);
179
180        // Abort the tasks.
181        debug!("Shutting down the prover...");
182        self.handles.lock().iter().for_each(|handle| handle.abort());
183
184        // Shut down the router.
185        self.router.shut_down().await;
186
187        info!("Node has shut down.");
188    }
189}
190
191impl<N: Network, C: ConsensusStorage<N>> Prover<N, C> {
192    /// Initialize a new instance of the puzzle.
193    async fn initialize_puzzle(&self) {
194        for _ in 0..self.max_puzzle_instances {
195            let prover = self.clone();
196            self.handles.lock().push(tokio::spawn(async move {
197                prover.puzzle_loop().await;
198            }));
199        }
200    }
201
202    /// Executes an instance of the puzzle.
203    async fn puzzle_loop(&self) {
204        loop {
205            // If the node is not connected to any peers, then skip this iteration.
206            if self.router.number_of_connected_peers() == 0 {
207                debug!("Skipping an iteration of the puzzle (no connected peers)");
208                tokio::time::sleep(Duration::from_secs(N::ANCHOR_TIME as u64)).await;
209                continue;
210            }
211
212            // If the number of instances of the puzzle exceeds the maximum, then skip this iteration.
213            if self.num_puzzle_instances() > self.max_puzzle_instances {
214                // Sleep for a brief period of time.
215                tokio::time::sleep(Duration::from_millis(500)).await;
216                continue;
217            }
218
219            // Read the latest epoch hash.
220            let latest_epoch_hash = *self.latest_epoch_hash.read();
221            // Read the latest state.
222            let latest_state = self
223                .latest_block_header
224                .read()
225                .as_ref()
226                .map(|header| (header.coinbase_target(), header.proof_target()));
227
228            // If the latest epoch hash and latest state exists, then proceed to generate a solution.
229            if let (Some(epoch_hash), Some((coinbase_target, proof_target))) = (latest_epoch_hash, latest_state) {
230                // Execute the puzzle.
231                let prover = self.clone();
232                let result = tokio::task::spawn_blocking(move || {
233                    prover.puzzle_iteration(epoch_hash, coinbase_target, proof_target, &mut OsRng)
234                })
235                .await;
236
237                // If the prover found a solution, then broadcast it.
238                if let Ok(Some((solution_target, solution))) = result {
239                    info!("Found a Solution '{}' (Proof Target {solution_target})", solution.id());
240                    // Broadcast the solution.
241                    self.broadcast_solution(solution);
242                }
243            } else {
244                // Otherwise, sleep for a brief period of time, to await for puzzle state.
245                tokio::time::sleep(Duration::from_secs(1)).await;
246            }
247
248            // If the Ctrl-C handler registered the signal, stop the prover.
249            if self.shutdown.load(Ordering::Acquire) {
250                debug!("Shutting down the puzzle...");
251                break;
252            }
253        }
254    }
255
256    /// Performs one iteration of the puzzle.
257    fn puzzle_iteration<R: Rng + CryptoRng>(
258        &self,
259        epoch_hash: N::BlockHash,
260        coinbase_target: u64,
261        proof_target: u64,
262        rng: &mut R,
263    ) -> Option<(u64, Solution<N>)> {
264        // Increment the puzzle instances.
265        self.increment_puzzle_instances();
266
267        debug!(
268            "Proving 'Puzzle' for Epoch '{}' {}",
269            fmt_id(epoch_hash),
270            format!("(Coinbase Target {coinbase_target}, Proof Target {proof_target})").dimmed()
271        );
272
273        // Compute the solution.
274        let result =
275            self.puzzle.prove(epoch_hash, self.address(), rng.r#gen(), Some(proof_target)).ok().and_then(|solution| {
276                self.puzzle.get_proof_target(&solution).ok().map(|solution_target| (solution_target, solution))
277            });
278
279        // Decrement the puzzle instances.
280        self.decrement_puzzle_instances();
281        // Return the result.
282        result
283    }
284
285    /// Broadcasts the solution to the network.
286    fn broadcast_solution(&self, solution: Solution<N>) {
287        // Prepare the unconfirmed solution message.
288        let message = Message::UnconfirmedSolution(UnconfirmedSolution {
289            solution_id: solution.id(),
290            solution: Data::Object(solution),
291        });
292        // Propagate the "UnconfirmedSolution".
293        self.propagate(message, &[]);
294    }
295
296    /// Returns the current number of puzzle instances.
297    fn num_puzzle_instances(&self) -> u8 {
298        self.puzzle_instances.load(Ordering::Relaxed)
299    }
300
301    /// Increments the number of puzzle instances.
302    fn increment_puzzle_instances(&self) {
303        self.puzzle_instances.fetch_add(1, Ordering::Relaxed);
304        #[cfg(debug_assertions)]
305        trace!("Number of Instances - {}", self.num_puzzle_instances());
306    }
307
308    /// Decrements the number of puzzle instances.
309    fn decrement_puzzle_instances(&self) {
310        self.puzzle_instances.fetch_sub(1, Ordering::Relaxed);
311        #[cfg(debug_assertions)]
312        trace!("Number of Instances - {}", self.num_puzzle_instances());
313    }
314}