mod router;
use crate::traits::NodeInterface;
use snarkos_account::Account;
use snarkos_node_bft::ledger_service::ProverLedgerService;
use snarkos_node_router::{
messages::{Message, NodeType, UnconfirmedSolution},
Heartbeat,
Inbound,
Outbound,
Router,
Routing,
};
use snarkos_node_sync::{BlockSync, BlockSyncMode};
use snarkos_node_tcp::{
protocols::{Disconnect, Handshake, OnConnect, Reading, Writing},
P2P,
};
use snarkvm::{
ledger::narwhal::Data,
prelude::{
block::{Block, Header},
coinbase::{CoinbasePuzzle, EpochChallenge, ProverSolution},
store::ConsensusStorage,
Network,
},
};
use anyhow::Result;
use colored::Colorize;
use core::{marker::PhantomData, time::Duration};
use parking_lot::{Mutex, RwLock};
use rand::{rngs::OsRng, CryptoRng, Rng};
use std::{
net::SocketAddr,
sync::{
atomic::{AtomicBool, AtomicU8, Ordering},
Arc,
},
};
use tokio::task::JoinHandle;
#[derive(Clone)]
pub struct Prover<N: Network, C: ConsensusStorage<N>> {
router: Router<N>,
sync: Arc<BlockSync<N>>,
genesis: Block<N>,
coinbase_puzzle: CoinbasePuzzle<N>,
latest_epoch_challenge: Arc<RwLock<Option<Arc<EpochChallenge<N>>>>>,
latest_block_header: Arc<RwLock<Option<Header<N>>>>,
puzzle_instances: Arc<AtomicU8>,
max_puzzle_instances: u8,
handles: Arc<Mutex<Vec<JoinHandle<()>>>>,
shutdown: Arc<AtomicBool>,
_phantom: PhantomData<C>,
}
impl<N: Network, C: ConsensusStorage<N>> Prover<N, C> {
pub async fn new(
node_ip: SocketAddr,
account: Account<N>,
trusted_peers: &[SocketAddr],
genesis: Block<N>,
dev: Option<u16>,
) -> Result<Self> {
let signal_node = Self::handle_signals();
let ledger_service = Arc::new(ProverLedgerService::new());
let sync = BlockSync::new(BlockSyncMode::Router, ledger_service.clone());
let router = Router::new(
node_ip,
NodeType::Prover,
account,
trusted_peers,
Self::MAXIMUM_NUMBER_OF_PEERS as u16,
dev.is_some(),
)
.await?;
let coinbase_puzzle = CoinbasePuzzle::<N>::load()?;
let max_puzzle_instances = num_cpus::get().saturating_sub(2).clamp(1, 6);
let node = Self {
router,
sync: Arc::new(sync),
genesis,
coinbase_puzzle,
latest_epoch_challenge: Default::default(),
latest_block_header: Default::default(),
puzzle_instances: Default::default(),
max_puzzle_instances: u8::try_from(max_puzzle_instances)?,
handles: Default::default(),
shutdown: Default::default(),
_phantom: Default::default(),
};
node.initialize_routing().await;
node.initialize_coinbase_puzzle().await;
node.handles.lock().push(crate::start_notification_message_loop());
let _ = signal_node.set(node.clone());
Ok(node)
}
}
#[async_trait]
impl<N: Network, C: ConsensusStorage<N>> NodeInterface<N> for Prover<N, C> {
async fn shut_down(&self) {
info!("Shutting down...");
trace!("Shutting down the coinbase puzzle...");
self.shutdown.store(true, Ordering::Relaxed);
trace!("Shutting down the prover...");
self.handles.lock().iter().for_each(|handle| handle.abort());
self.router.shut_down().await;
info!("Node has shut down.");
}
}
impl<N: Network, C: ConsensusStorage<N>> Prover<N, C> {
async fn initialize_coinbase_puzzle(&self) {
for _ in 0..self.max_puzzle_instances {
let prover = self.clone();
self.handles.lock().push(tokio::spawn(async move {
prover.coinbase_puzzle_loop().await;
}));
}
}
async fn coinbase_puzzle_loop(&self) {
loop {
if self.router.number_of_connected_peers() == 0 {
trace!("Skipping an iteration of the coinbase puzzle (no connected peers)");
tokio::time::sleep(Duration::from_secs(N::ANCHOR_TIME as u64)).await;
continue;
}
if self.num_puzzle_instances() > self.max_puzzle_instances {
tokio::time::sleep(Duration::from_millis(500)).await;
continue;
}
let latest_epoch_challenge = self.latest_epoch_challenge.read().clone();
let latest_state = self
.latest_block_header
.read()
.as_ref()
.map(|header| (header.coinbase_target(), header.proof_target()));
if let (Some(challenge), Some((coinbase_target, proof_target))) = (latest_epoch_challenge, latest_state) {
let prover = self.clone();
let result = tokio::task::spawn_blocking(move || {
prover.coinbase_puzzle_iteration(&challenge, coinbase_target, proof_target, &mut OsRng)
})
.await;
if let Ok(Some((solution_target, solution))) = result {
info!("Found a Solution '{}' (Proof Target {solution_target})", solution.commitment());
self.broadcast_prover_solution(solution);
}
} else {
tokio::time::sleep(Duration::from_secs(1)).await;
}
if self.shutdown.load(Ordering::Relaxed) {
trace!("Shutting down the coinbase puzzle...");
break;
}
}
}
fn coinbase_puzzle_iteration<R: Rng + CryptoRng>(
&self,
epoch_challenge: &EpochChallenge<N>,
coinbase_target: u64,
proof_target: u64,
rng: &mut R,
) -> Option<(u64, ProverSolution<N>)> {
self.increment_puzzle_instances();
trace!(
"Proving 'CoinbasePuzzle' {}",
format!(
"(Epoch {}, Coinbase Target {coinbase_target}, Proof Target {proof_target})",
epoch_challenge.epoch_number(),
)
.dimmed()
);
let result = self
.coinbase_puzzle
.prove(epoch_challenge, self.address(), rng.gen(), Some(proof_target))
.ok()
.and_then(|solution| solution.to_target().ok().map(|solution_target| (solution_target, solution)));
self.decrement_puzzle_instances();
result
}
fn broadcast_prover_solution(&self, prover_solution: ProverSolution<N>) {
let message = Message::UnconfirmedSolution(UnconfirmedSolution {
solution_id: prover_solution.commitment(),
solution: Data::Object(prover_solution),
});
self.propagate(message, &[]);
}
fn num_puzzle_instances(&self) -> u8 {
self.puzzle_instances.load(Ordering::Relaxed)
}
fn increment_puzzle_instances(&self) {
self.puzzle_instances.fetch_add(1, Ordering::Relaxed);
#[cfg(debug_assertions)]
trace!("Number of Instances - {}", self.num_puzzle_instances());
}
fn decrement_puzzle_instances(&self) {
self.puzzle_instances.fetch_sub(1, Ordering::Relaxed);
#[cfg(debug_assertions)]
trace!("Number of Instances - {}", self.num_puzzle_instances());
}
}