snarkos_node/prover/
mod.rs

1// Copyright 2024-2025 Aleo Network Foundation
2// This file is part of the snarkOS library.
3
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at:
7
8// http://www.apache.org/licenses/LICENSE-2.0
9
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16mod router;
17
18use crate::traits::NodeInterface;
19use snarkos_account::Account;
20use snarkos_node_bft::ledger_service::ProverLedgerService;
21use snarkos_node_router::{
22    Heartbeat,
23    Inbound,
24    Outbound,
25    Router,
26    Routing,
27    messages::{Message, NodeType, UnconfirmedSolution},
28};
29use snarkos_node_sync::{BlockSync, BlockSyncMode};
30use snarkos_node_tcp::{
31    P2P,
32    protocols::{Disconnect, Handshake, OnConnect, Reading, Writing},
33};
34use snarkvm::{
35    ledger::narwhal::Data,
36    prelude::{
37        Network,
38        block::{Block, Header},
39        puzzle::{Puzzle, Solution},
40        store::ConsensusStorage,
41    },
42    synthesizer::VM,
43};
44
45use aleo_std::StorageMode;
46use anyhow::Result;
47use colored::Colorize;
48use core::{marker::PhantomData, time::Duration};
49#[cfg(feature = "locktick")]
50use locktick::parking_lot::{Mutex, RwLock};
51#[cfg(not(feature = "locktick"))]
52use parking_lot::{Mutex, RwLock};
53use rand::{CryptoRng, Rng, rngs::OsRng};
54use snarkos_node_bft::helpers::fmt_id;
55use std::{
56    net::SocketAddr,
57    sync::{
58        Arc,
59        atomic::{AtomicBool, AtomicU8, Ordering},
60    },
61};
62use tokio::task::JoinHandle;
63
64/// A prover is a light node, capable of producing proofs for consensus.
65#[derive(Clone)]
66pub struct Prover<N: Network, C: ConsensusStorage<N>> {
67    /// The router of the node.
68    router: Router<N>,
69    /// The sync module.
70    sync: Arc<BlockSync<N>>,
71    /// The genesis block.
72    genesis: Block<N>,
73    /// The puzzle.
74    puzzle: Puzzle<N>,
75    /// The latest epoch hash.
76    latest_epoch_hash: Arc<RwLock<Option<N::BlockHash>>>,
77    /// The latest block header.
78    latest_block_header: Arc<RwLock<Option<Header<N>>>>,
79    /// The number of puzzle instances.
80    puzzle_instances: Arc<AtomicU8>,
81    /// The maximum number of puzzle instances.
82    max_puzzle_instances: u8,
83    /// The spawned handles.
84    handles: Arc<Mutex<Vec<JoinHandle<()>>>>,
85    /// The shutdown signal.
86    shutdown: Arc<AtomicBool>,
87    /// PhantomData.
88    _phantom: PhantomData<C>,
89}
90
91impl<N: Network, C: ConsensusStorage<N>> Prover<N, C> {
92    /// Initializes a new prover node.
93    pub async fn new(
94        node_ip: SocketAddr,
95        account: Account<N>,
96        trusted_peers: &[SocketAddr],
97        genesis: Block<N>,
98        storage_mode: StorageMode,
99        shutdown: Arc<AtomicBool>,
100    ) -> Result<Self> {
101        // Initialize the signal handler.
102        let signal_node = Self::handle_signals(shutdown.clone());
103
104        // Initialize the ledger service.
105        let ledger_service = Arc::new(ProverLedgerService::new());
106        // Determine if the prover should allow external peers.
107        let allow_external_peers = true;
108        // Determine if the prover should rotate external peers.
109        let rotate_external_peers = false;
110
111        // Initialize the node router.
112        let router = Router::new(
113            node_ip,
114            NodeType::Prover,
115            account,
116            trusted_peers,
117            Self::MAXIMUM_NUMBER_OF_PEERS as u16,
118            rotate_external_peers,
119            allow_external_peers,
120            matches!(storage_mode, StorageMode::Development(_)),
121        )
122        .await?;
123
124        // Initialize the sync module.
125        let sync = BlockSync::new(BlockSyncMode::Router, ledger_service.clone(), router.tcp().clone());
126
127        // Compute the maximum number of puzzle instances.
128        let max_puzzle_instances = num_cpus::get().saturating_sub(2).clamp(1, 6);
129        // Initialize the node.
130        let node = Self {
131            router,
132            sync: Arc::new(sync),
133            genesis,
134            puzzle: VM::<N, C>::new_puzzle()?,
135            latest_epoch_hash: Default::default(),
136            latest_block_header: Default::default(),
137            puzzle_instances: Default::default(),
138            max_puzzle_instances: u8::try_from(max_puzzle_instances)?,
139            handles: Default::default(),
140            shutdown,
141            _phantom: Default::default(),
142        };
143        // Initialize the routing.
144        node.initialize_routing().await;
145        // Initialize the puzzle.
146        node.initialize_puzzle().await;
147        // Initialize the notification message loop.
148        node.handles.lock().push(crate::start_notification_message_loop());
149        // Pass the node to the signal handler.
150        let _ = signal_node.set(node.clone());
151        // Return the node.
152        Ok(node)
153    }
154}
155
156#[async_trait]
157impl<N: Network, C: ConsensusStorage<N>> NodeInterface<N> for Prover<N, C> {
158    /// Shuts down the node.
159    async fn shut_down(&self) {
160        info!("Shutting down...");
161
162        // Shut down the puzzle.
163        debug!("Shutting down the puzzle...");
164        self.shutdown.store(true, Ordering::Release);
165
166        // Abort the tasks.
167        debug!("Shutting down the prover...");
168        self.handles.lock().iter().for_each(|handle| handle.abort());
169
170        // Shut down the router.
171        self.router.shut_down().await;
172
173        info!("Node has shut down.");
174    }
175}
176
177impl<N: Network, C: ConsensusStorage<N>> Prover<N, C> {
178    /// Initialize a new instance of the puzzle.
179    async fn initialize_puzzle(&self) {
180        for _ in 0..self.max_puzzle_instances {
181            let prover = self.clone();
182            self.handles.lock().push(tokio::spawn(async move {
183                prover.puzzle_loop().await;
184            }));
185        }
186    }
187
188    /// Executes an instance of the puzzle.
189    async fn puzzle_loop(&self) {
190        loop {
191            // If the node is not connected to any peers, then skip this iteration.
192            if self.router.number_of_connected_peers() == 0 {
193                debug!("Skipping an iteration of the puzzle (no connected peers)");
194                tokio::time::sleep(Duration::from_secs(N::ANCHOR_TIME as u64)).await;
195                continue;
196            }
197
198            // If the number of instances of the puzzle exceeds the maximum, then skip this iteration.
199            if self.num_puzzle_instances() > self.max_puzzle_instances {
200                // Sleep for a brief period of time.
201                tokio::time::sleep(Duration::from_millis(500)).await;
202                continue;
203            }
204
205            // Read the latest epoch hash.
206            let latest_epoch_hash = *self.latest_epoch_hash.read();
207            // Read the latest state.
208            let latest_state = self
209                .latest_block_header
210                .read()
211                .as_ref()
212                .map(|header| (header.coinbase_target(), header.proof_target()));
213
214            // If the latest epoch hash and latest state exists, then proceed to generate a solution.
215            if let (Some(epoch_hash), Some((coinbase_target, proof_target))) = (latest_epoch_hash, latest_state) {
216                // Execute the puzzle.
217                let prover = self.clone();
218                let result = tokio::task::spawn_blocking(move || {
219                    prover.puzzle_iteration(epoch_hash, coinbase_target, proof_target, &mut OsRng)
220                })
221                .await;
222
223                // If the prover found a solution, then broadcast it.
224                if let Ok(Some((solution_target, solution))) = result {
225                    info!("Found a Solution '{}' (Proof Target {solution_target})", solution.id());
226                    // Broadcast the solution.
227                    self.broadcast_solution(solution);
228                }
229            } else {
230                // Otherwise, sleep for a brief period of time, to await for puzzle state.
231                tokio::time::sleep(Duration::from_secs(1)).await;
232            }
233
234            // If the Ctrl-C handler registered the signal, stop the prover.
235            if self.shutdown.load(Ordering::Acquire) {
236                debug!("Shutting down the puzzle...");
237                break;
238            }
239        }
240    }
241
242    /// Performs one iteration of the puzzle.
243    fn puzzle_iteration<R: Rng + CryptoRng>(
244        &self,
245        epoch_hash: N::BlockHash,
246        coinbase_target: u64,
247        proof_target: u64,
248        rng: &mut R,
249    ) -> Option<(u64, Solution<N>)> {
250        // Increment the puzzle instances.
251        self.increment_puzzle_instances();
252
253        debug!(
254            "Proving 'Puzzle' for Epoch '{}' {}",
255            fmt_id(epoch_hash),
256            format!("(Coinbase Target {coinbase_target}, Proof Target {proof_target})").dimmed()
257        );
258
259        // Compute the solution.
260        let result =
261            self.puzzle.prove(epoch_hash, self.address(), rng.gen(), Some(proof_target)).ok().and_then(|solution| {
262                self.puzzle.get_proof_target(&solution).ok().map(|solution_target| (solution_target, solution))
263            });
264
265        // Decrement the puzzle instances.
266        self.decrement_puzzle_instances();
267        // Return the result.
268        result
269    }
270
271    /// Broadcasts the solution to the network.
272    fn broadcast_solution(&self, solution: Solution<N>) {
273        // Prepare the unconfirmed solution message.
274        let message = Message::UnconfirmedSolution(UnconfirmedSolution {
275            solution_id: solution.id(),
276            solution: Data::Object(solution),
277        });
278        // Propagate the "UnconfirmedSolution".
279        self.propagate(message, &[]);
280    }
281
282    /// Returns the current number of puzzle instances.
283    fn num_puzzle_instances(&self) -> u8 {
284        self.puzzle_instances.load(Ordering::Relaxed)
285    }
286
287    /// Increments the number of puzzle instances.
288    fn increment_puzzle_instances(&self) {
289        self.puzzle_instances.fetch_add(1, Ordering::Relaxed);
290        #[cfg(debug_assertions)]
291        trace!("Number of Instances - {}", self.num_puzzle_instances());
292    }
293
294    /// Decrements the number of puzzle instances.
295    fn decrement_puzzle_instances(&self) {
296        self.puzzle_instances.fetch_sub(1, Ordering::Relaxed);
297        #[cfg(debug_assertions)]
298        trace!("Number of Instances - {}", self.num_puzzle_instances());
299    }
300}