mod router;
use crate::traits::NodeInterface;
use snarkos_account::Account;
use snarkos_node_bft::ledger_service::ProverLedgerService;
use snarkos_node_router::{
messages::{Message, NodeType, UnconfirmedSolution},
Heartbeat,
Inbound,
Outbound,
Router,
Routing,
};
use snarkos_node_sync::{BlockSync, BlockSyncMode};
use snarkos_node_tcp::{
protocols::{Disconnect, Handshake, OnConnect, Reading, Writing},
P2P,
};
use snarkvm::{
ledger::narwhal::Data,
prelude::{
block::{Block, Header},
puzzle::{Puzzle, Solution},
store::ConsensusStorage,
Network,
},
synthesizer::VM,
};
use aleo_std::StorageMode;
use anyhow::Result;
use colored::Colorize;
use core::{marker::PhantomData, time::Duration};
use parking_lot::{Mutex, RwLock};
use rand::{rngs::OsRng, CryptoRng, Rng};
use snarkos_node_bft::helpers::fmt_id;
use std::{
net::SocketAddr,
sync::{
atomic::{AtomicBool, AtomicU8, Ordering},
Arc,
},
};
use tokio::task::JoinHandle;
#[derive(Clone)]
pub struct Prover<N: Network, C: ConsensusStorage<N>> {
router: Router<N>,
sync: Arc<BlockSync<N>>,
genesis: Block<N>,
puzzle: Puzzle<N>,
latest_epoch_hash: Arc<RwLock<Option<N::BlockHash>>>,
latest_block_header: Arc<RwLock<Option<Header<N>>>>,
puzzle_instances: Arc<AtomicU8>,
max_puzzle_instances: u8,
handles: Arc<Mutex<Vec<JoinHandle<()>>>>,
shutdown: Arc<AtomicBool>,
_phantom: PhantomData<C>,
}
impl<N: Network, C: ConsensusStorage<N>> Prover<N, C> {
pub async fn new(
node_ip: SocketAddr,
account: Account<N>,
trusted_peers: &[SocketAddr],
genesis: Block<N>,
storage_mode: StorageMode,
shutdown: Arc<AtomicBool>,
) -> Result<Self> {
let signal_node = Self::handle_signals(shutdown.clone());
let ledger_service = Arc::new(ProverLedgerService::new());
let sync = BlockSync::new(BlockSyncMode::Router, ledger_service.clone());
let allow_external_peers = true;
let router = Router::new(
node_ip,
NodeType::Prover,
account,
trusted_peers,
Self::MAXIMUM_NUMBER_OF_PEERS as u16,
allow_external_peers,
matches!(storage_mode, StorageMode::Development(_)),
)
.await?;
let max_puzzle_instances = num_cpus::get().saturating_sub(2).clamp(1, 6);
let node = Self {
router,
sync: Arc::new(sync),
genesis,
puzzle: VM::<N, C>::new_puzzle()?,
latest_epoch_hash: Default::default(),
latest_block_header: Default::default(),
puzzle_instances: Default::default(),
max_puzzle_instances: u8::try_from(max_puzzle_instances)?,
handles: Default::default(),
shutdown,
_phantom: Default::default(),
};
node.initialize_routing().await;
node.initialize_puzzle().await;
node.handles.lock().push(crate::start_notification_message_loop());
let _ = signal_node.set(node.clone());
Ok(node)
}
}
#[async_trait]
impl<N: Network, C: ConsensusStorage<N>> NodeInterface<N> for Prover<N, C> {
async fn shut_down(&self) {
info!("Shutting down...");
debug!("Shutting down the puzzle...");
self.shutdown.store(true, Ordering::Relaxed);
debug!("Shutting down the prover...");
self.handles.lock().iter().for_each(|handle| handle.abort());
self.router.shut_down().await;
info!("Node has shut down.");
}
}
impl<N: Network, C: ConsensusStorage<N>> Prover<N, C> {
async fn initialize_puzzle(&self) {
for _ in 0..self.max_puzzle_instances {
let prover = self.clone();
self.handles.lock().push(tokio::spawn(async move {
prover.puzzle_loop().await;
}));
}
}
async fn puzzle_loop(&self) {
loop {
if self.router.number_of_connected_peers() == 0 {
debug!("Skipping an iteration of the puzzle (no connected peers)");
tokio::time::sleep(Duration::from_secs(N::ANCHOR_TIME as u64)).await;
continue;
}
if self.num_puzzle_instances() > self.max_puzzle_instances {
tokio::time::sleep(Duration::from_millis(500)).await;
continue;
}
let latest_epoch_hash = *self.latest_epoch_hash.read();
let latest_state = self
.latest_block_header
.read()
.as_ref()
.map(|header| (header.coinbase_target(), header.proof_target()));
if let (Some(epoch_hash), Some((coinbase_target, proof_target))) = (latest_epoch_hash, latest_state) {
let prover = self.clone();
let result = tokio::task::spawn_blocking(move || {
prover.puzzle_iteration(epoch_hash, coinbase_target, proof_target, &mut OsRng)
})
.await;
if let Ok(Some((solution_target, solution))) = result {
info!("Found a Solution '{}' (Proof Target {solution_target})", solution.id());
self.broadcast_solution(solution);
}
} else {
tokio::time::sleep(Duration::from_secs(1)).await;
}
if self.shutdown.load(Ordering::Relaxed) {
debug!("Shutting down the puzzle...");
break;
}
}
}
fn puzzle_iteration<R: Rng + CryptoRng>(
&self,
epoch_hash: N::BlockHash,
coinbase_target: u64,
proof_target: u64,
rng: &mut R,
) -> Option<(u64, Solution<N>)> {
self.increment_puzzle_instances();
debug!(
"Proving 'Puzzle' for Epoch '{}' {}",
fmt_id(epoch_hash),
format!("(Coinbase Target {coinbase_target}, Proof Target {proof_target})").dimmed()
);
let result =
self.puzzle.prove(epoch_hash, self.address(), rng.gen(), Some(proof_target)).ok().and_then(|solution| {
self.puzzle.get_proof_target(&solution).ok().map(|solution_target| (solution_target, solution))
});
self.decrement_puzzle_instances();
result
}
fn broadcast_solution(&self, solution: Solution<N>) {
let message = Message::UnconfirmedSolution(UnconfirmedSolution {
solution_id: solution.id(),
solution: Data::Object(solution),
});
self.propagate(message, &[]);
}
fn num_puzzle_instances(&self) -> u8 {
self.puzzle_instances.load(Ordering::Relaxed)
}
fn increment_puzzle_instances(&self) {
self.puzzle_instances.fetch_add(1, Ordering::Relaxed);
#[cfg(debug_assertions)]
trace!("Number of Instances - {}", self.num_puzzle_instances());
}
fn decrement_puzzle_instances(&self) {
self.puzzle_instances.fetch_sub(1, Ordering::Relaxed);
#[cfg(debug_assertions)]
trace!("Number of Instances - {}", self.num_puzzle_instances());
}
}