#![forbid(unsafe_code)]
#[macro_use]
extern crate tracing;
use snarkos_account::Account;
use snarkos_node_bft::{
BFT,
MAX_BATCH_DELAY_IN_MS,
Primary,
helpers::{
ConsensusReceiver,
PrimaryReceiver,
PrimarySender,
Storage as NarwhalStorage,
fmt_id,
init_consensus_channels,
},
spawn_blocking,
};
use snarkos_node_bft_ledger_service::LedgerService;
use snarkos_node_bft_storage_service::BFTPersistentStorage;
use snarkvm::{
ledger::{
block::Transaction,
narwhal::{BatchHeader, Data, Subdag, Transmission, TransmissionID},
puzzle::{Solution, SolutionID},
},
prelude::*,
};
use aleo_std::StorageMode;
use anyhow::Result;
use colored::Colorize;
use indexmap::IndexMap;
use lru::LruCache;
use parking_lot::Mutex;
use std::{future::Future, net::SocketAddr, num::NonZeroUsize, sync::Arc, time::Duration};
use tokio::{
sync::{OnceCell, oneshot},
task::JoinHandle,
};
#[cfg(feature = "metrics")]
use std::collections::HashMap;
const CAPACITY_FOR_DEPLOYMENTS: usize = 1 << 10;
const CAPACITY_FOR_EXECUTIONS: usize = 1 << 10;
const CAPACITY_FOR_SOLUTIONS: usize = 1 << 10;
const MAX_DEPLOYMENTS_PER_INTERVAL: usize = 1;
struct TransactionsQueue<N: Network> {
pub deployments: LruCache<N::TransactionID, Transaction<N>>,
pub executions: LruCache<N::TransactionID, Transaction<N>>,
}
impl<N: Network> Default for TransactionsQueue<N> {
fn default() -> Self {
Self {
deployments: LruCache::new(NonZeroUsize::new(CAPACITY_FOR_DEPLOYMENTS).unwrap()),
executions: LruCache::new(NonZeroUsize::new(CAPACITY_FOR_EXECUTIONS).unwrap()),
}
}
}
#[derive(Clone)]
pub struct Consensus<N: Network> {
ledger: Arc<dyn LedgerService<N>>,
bft: BFT<N>,
primary_sender: Arc<OnceCell<PrimarySender<N>>>,
solutions_queue: Arc<Mutex<LruCache<SolutionID<N>, Solution<N>>>>,
transactions_queue: Arc<Mutex<TransactionsQueue<N>>>,
seen_solutions: Arc<Mutex<LruCache<SolutionID<N>, ()>>>,
seen_transactions: Arc<Mutex<LruCache<N::TransactionID, ()>>>,
#[cfg(feature = "metrics")]
transmissions_queue_timestamps: Arc<Mutex<HashMap<TransmissionID<N>, i64>>>,
handles: Arc<Mutex<Vec<JoinHandle<()>>>>,
}
impl<N: Network> Consensus<N> {
pub fn new(
account: Account<N>,
ledger: Arc<dyn LedgerService<N>>,
ip: Option<SocketAddr>,
trusted_validators: &[SocketAddr],
storage_mode: StorageMode,
) -> Result<Self> {
let dev = match storage_mode {
StorageMode::Development(id) => Some(id),
StorageMode::Production | StorageMode::Custom(..) => None,
};
let transmissions = Arc::new(BFTPersistentStorage::open(storage_mode)?);
let storage = NarwhalStorage::new(ledger.clone(), transmissions, BatchHeader::<N>::MAX_GC_ROUNDS as u64);
let bft = BFT::new(account, storage, ledger.clone(), ip, trusted_validators, dev)?;
Ok(Self {
ledger,
bft,
primary_sender: Default::default(),
solutions_queue: Arc::new(Mutex::new(LruCache::new(NonZeroUsize::new(CAPACITY_FOR_SOLUTIONS).unwrap()))),
transactions_queue: Default::default(),
seen_solutions: Arc::new(Mutex::new(LruCache::new(NonZeroUsize::new(1 << 16).unwrap()))),
seen_transactions: Arc::new(Mutex::new(LruCache::new(NonZeroUsize::new(1 << 16).unwrap()))),
#[cfg(feature = "metrics")]
transmissions_queue_timestamps: Default::default(),
handles: Default::default(),
})
}
pub async fn run(&mut self, primary_sender: PrimarySender<N>, primary_receiver: PrimaryReceiver<N>) -> Result<()> {
info!("Starting the consensus instance...");
self.primary_sender.set(primary_sender.clone()).expect("Primary sender already set");
let (consensus_sender, consensus_receiver) = init_consensus_channels();
self.start_handlers(consensus_receiver);
self.bft.run(Some(consensus_sender), primary_sender, primary_receiver).await?;
Ok(())
}
pub const fn ledger(&self) -> &Arc<dyn LedgerService<N>> {
&self.ledger
}
pub const fn bft(&self) -> &BFT<N> {
&self.bft
}
pub fn primary_sender(&self) -> &PrimarySender<N> {
self.primary_sender.get().expect("Primary sender not set")
}
}
impl<N: Network> Consensus<N> {
pub fn num_unconfirmed_transmissions(&self) -> usize {
self.bft.num_unconfirmed_transmissions()
}
pub fn num_unconfirmed_ratifications(&self) -> usize {
self.bft.num_unconfirmed_ratifications()
}
pub fn num_unconfirmed_solutions(&self) -> usize {
self.bft.num_unconfirmed_solutions()
}
pub fn num_unconfirmed_transactions(&self) -> usize {
self.bft.num_unconfirmed_transactions()
}
}
impl<N: Network> Consensus<N> {
pub fn unconfirmed_transmission_ids(&self) -> impl '_ + Iterator<Item = TransmissionID<N>> {
self.worker_transmission_ids().chain(self.inbound_transmission_ids())
}
pub fn unconfirmed_transmissions(&self) -> impl '_ + Iterator<Item = (TransmissionID<N>, Transmission<N>)> {
self.worker_transmissions().chain(self.inbound_transmissions())
}
pub fn unconfirmed_solutions(&self) -> impl '_ + Iterator<Item = (SolutionID<N>, Data<Solution<N>>)> {
self.worker_solutions().chain(self.inbound_solutions())
}
pub fn unconfirmed_transactions(&self) -> impl '_ + Iterator<Item = (N::TransactionID, Data<Transaction<N>>)> {
self.worker_transactions().chain(self.inbound_transactions())
}
}
impl<N: Network> Consensus<N> {
pub fn worker_transmission_ids(&self) -> impl '_ + Iterator<Item = TransmissionID<N>> {
self.bft.worker_transmission_ids()
}
pub fn worker_transmissions(&self) -> impl '_ + Iterator<Item = (TransmissionID<N>, Transmission<N>)> {
self.bft.worker_transmissions()
}
pub fn worker_solutions(&self) -> impl '_ + Iterator<Item = (SolutionID<N>, Data<Solution<N>>)> {
self.bft.worker_solutions()
}
pub fn worker_transactions(&self) -> impl '_ + Iterator<Item = (N::TransactionID, Data<Transaction<N>>)> {
self.bft.worker_transactions()
}
}
impl<N: Network> Consensus<N> {
pub fn inbound_transmission_ids(&self) -> impl '_ + Iterator<Item = TransmissionID<N>> {
self.inbound_transmissions().map(|(id, _)| id)
}
pub fn inbound_transmissions(&self) -> impl '_ + Iterator<Item = (TransmissionID<N>, Transmission<N>)> {
self.inbound_transactions()
.map(|(id, tx)| {
(
TransmissionID::Transaction(id, tx.to_checksum::<N>().unwrap_or_default()),
Transmission::Transaction(tx),
)
})
.chain(self.inbound_solutions().map(|(id, solution)| {
(
TransmissionID::Solution(id, solution.to_checksum::<N>().unwrap_or_default()),
Transmission::Solution(solution),
)
}))
}
pub fn inbound_solutions(&self) -> impl '_ + Iterator<Item = (SolutionID<N>, Data<Solution<N>>)> {
self.solutions_queue.lock().clone().into_iter().map(|(id, solution)| (id, Data::Object(solution)))
}
pub fn inbound_transactions(&self) -> impl '_ + Iterator<Item = (N::TransactionID, Data<Transaction<N>>)> {
let tx_queue = self.transactions_queue.lock();
tx_queue
.deployments
.clone()
.into_iter()
.chain(tx_queue.executions.clone())
.map(|(id, tx)| (id, Data::Object(tx)))
}
}
impl<N: Network> Consensus<N> {
pub async fn add_unconfirmed_solution(&self, solution: Solution<N>) -> Result<()> {
let checksum = Data::<Solution<N>>::Buffer(solution.to_bytes_le()?.into()).to_checksum::<N>()?;
#[cfg(feature = "metrics")]
{
metrics::increment_gauge(metrics::consensus::UNCONFIRMED_SOLUTIONS, 1f64);
let timestamp = snarkos_node_bft::helpers::now();
self.transmissions_queue_timestamps
.lock()
.insert(TransmissionID::Solution(solution.id(), checksum), timestamp);
}
{
let solution_id = solution.id();
if self.seen_solutions.lock().put(solution_id, ()).is_some() {
return Ok(());
}
if self.ledger.contains_transmission(&TransmissionID::Solution(solution_id, checksum))? {
bail!("Solution '{}' exists in the ledger {}", fmt_id(solution_id), "(skipping)".dimmed());
}
trace!("Received unconfirmed solution '{}' in the queue", fmt_id(solution_id));
if self.solutions_queue.lock().put(solution_id, solution).is_some() {
bail!("Solution '{}' exists in the memory pool", fmt_id(solution_id));
}
}
self.process_unconfirmed_solutions().await
}
pub async fn process_unconfirmed_solutions(&self) -> Result<()> {
let num_unconfirmed_solutions = self.num_unconfirmed_solutions();
let num_unconfirmed_transmissions = self.num_unconfirmed_transmissions();
if num_unconfirmed_solutions >= N::MAX_SOLUTIONS
|| num_unconfirmed_transmissions >= Primary::<N>::MAX_TRANSMISSIONS_TOLERANCE
{
return Ok(());
}
let solutions = {
let capacity = N::MAX_SOLUTIONS.saturating_sub(num_unconfirmed_solutions);
let mut queue = self.solutions_queue.lock();
let num_solutions = queue.len().min(capacity);
(0..num_solutions).filter_map(|_| queue.pop_lru().map(|(_, solution)| solution)).collect::<Vec<_>>()
};
for solution in solutions.into_iter() {
let solution_id = solution.id();
trace!("Adding unconfirmed solution '{}' to the memory pool...", fmt_id(solution_id));
if let Err(e) = self.primary_sender().send_unconfirmed_solution(solution_id, Data::Object(solution)).await {
if self.bft.is_synced() {
if self.ledger().latest_block_height() % N::NUM_BLOCKS_PER_EPOCH > 10 {
warn!("Failed to add unconfirmed solution '{}' to the memory pool - {e}", fmt_id(solution_id))
};
}
}
}
Ok(())
}
pub async fn add_unconfirmed_transaction(&self, transaction: Transaction<N>) -> Result<()> {
let checksum = Data::<Transaction<N>>::Buffer(transaction.to_bytes_le()?.into()).to_checksum::<N>()?;
#[cfg(feature = "metrics")]
{
metrics::increment_gauge(metrics::consensus::UNCONFIRMED_TRANSACTIONS, 1f64);
let timestamp = snarkos_node_bft::helpers::now();
self.transmissions_queue_timestamps
.lock()
.insert(TransmissionID::Transaction(transaction.id(), checksum), timestamp);
}
{
let transaction_id = transaction.id();
if transaction.is_fee() {
bail!("Transaction '{}' is a fee transaction {}", fmt_id(transaction_id), "(skipping)".dimmed());
}
if self.seen_transactions.lock().put(transaction_id, ()).is_some() {
return Ok(());
}
if self.ledger.contains_transmission(&TransmissionID::Transaction(transaction_id, checksum))? {
bail!("Transaction '{}' exists in the ledger {}", fmt_id(transaction_id), "(skipping)".dimmed());
}
trace!("Received unconfirmed transaction '{}' in the queue", fmt_id(transaction_id));
if transaction.is_deploy() {
if self.transactions_queue.lock().deployments.put(transaction_id, transaction).is_some() {
bail!("Transaction '{}' exists in the memory pool", fmt_id(transaction_id));
}
} else if self.transactions_queue.lock().executions.put(transaction_id, transaction).is_some() {
bail!("Transaction '{}' exists in the memory pool", fmt_id(transaction_id));
}
self.process_unconfirmed_transactions().await
}
}
pub async fn process_unconfirmed_transactions(&self) -> Result<()> {
let num_unconfirmed_transmissions = self.num_unconfirmed_transmissions();
if num_unconfirmed_transmissions >= Primary::<N>::MAX_TRANSMISSIONS_TOLERANCE {
return Ok(());
}
let transactions = {
let capacity = Primary::<N>::MAX_TRANSMISSIONS_TOLERANCE.saturating_sub(num_unconfirmed_transmissions);
let mut tx_queue = self.transactions_queue.lock();
let num_deployments = tx_queue.deployments.len().min(capacity).min(MAX_DEPLOYMENTS_PER_INTERVAL);
let num_executions = tx_queue.executions.len().min(capacity.saturating_sub(num_deployments));
let selector_iter = (0..num_deployments).map(|_| true).interleave((0..num_executions).map(|_| false));
selector_iter
.filter_map(|select_deployment| {
if select_deployment {
tx_queue.deployments.pop_lru().map(|(_, tx)| tx)
} else {
tx_queue.executions.pop_lru().map(|(_, tx)| tx)
}
})
.collect_vec()
};
for transaction in transactions.into_iter() {
let transaction_id = transaction.id();
trace!("Adding unconfirmed transaction '{}' to the memory pool...", fmt_id(transaction_id));
if let Err(e) =
self.primary_sender().send_unconfirmed_transaction(transaction_id, Data::Object(transaction)).await
{
if self.bft.is_synced() {
warn!(
"Failed to add unconfirmed transaction '{}' to the memory pool - {e}",
fmt_id(transaction_id)
);
}
}
}
Ok(())
}
}
impl<N: Network> Consensus<N> {
fn start_handlers(&self, consensus_receiver: ConsensusReceiver<N>) {
let ConsensusReceiver { mut rx_consensus_subdag } = consensus_receiver;
let self_ = self.clone();
self.spawn(async move {
while let Some((committed_subdag, transmissions, callback)) = rx_consensus_subdag.recv().await {
self_.process_bft_subdag(committed_subdag, transmissions, callback).await;
}
});
let self_ = self.clone();
self.spawn(async move {
loop {
tokio::time::sleep(Duration::from_millis(MAX_BATCH_DELAY_IN_MS)).await;
if let Err(e) = self_.process_unconfirmed_transactions().await {
warn!("Cannot process unconfirmed transactions - {e}");
}
if let Err(e) = self_.process_unconfirmed_solutions().await {
warn!("Cannot process unconfirmed solutions - {e}");
}
}
});
}
async fn process_bft_subdag(
&self,
subdag: Subdag<N>,
transmissions: IndexMap<TransmissionID<N>, Transmission<N>>,
callback: oneshot::Sender<Result<()>>,
) {
let self_ = self.clone();
let transmissions_ = transmissions.clone();
let result = spawn_blocking! { self_.try_advance_to_next_block(subdag, transmissions_) };
if let Err(e) = &result {
error!("Unable to advance to the next block - {e}");
self.reinsert_transmissions(transmissions).await;
}
callback.send(result).ok();
}
fn try_advance_to_next_block(
&self,
subdag: Subdag<N>,
transmissions: IndexMap<TransmissionID<N>, Transmission<N>>,
) -> Result<()> {
#[cfg(feature = "metrics")]
let start = subdag.leader_certificate().batch_header().timestamp();
#[cfg(feature = "metrics")]
let num_committed_certificates = subdag.values().map(|c| c.len()).sum::<usize>();
#[cfg(feature = "metrics")]
let current_block_timestamp = self.ledger.latest_block().header().metadata().timestamp();
let next_block = self.ledger.prepare_advance_to_next_quorum_block(subdag, transmissions)?;
self.ledger.check_next_block(&next_block)?;
self.ledger.advance_to_next_block(&next_block)?;
if next_block.height() % N::NUM_BLOCKS_PER_EPOCH == 0 {
self.solutions_queue.lock().clear();
self.bft.primary().clear_worker_solutions();
}
#[cfg(feature = "metrics")]
{
let elapsed = std::time::Duration::from_secs((snarkos_node_bft::helpers::now() - start) as u64);
let next_block_timestamp = next_block.header().metadata().timestamp();
let block_latency = next_block_timestamp - current_block_timestamp;
let proof_target = next_block.header().proof_target();
let coinbase_target = next_block.header().coinbase_target();
let cumulative_proof_target = next_block.header().cumulative_proof_target();
metrics::add_transmission_latency_metric(&self.transmissions_queue_timestamps, &next_block);
metrics::gauge(metrics::consensus::COMMITTED_CERTIFICATES, num_committed_certificates as f64);
metrics::histogram(metrics::consensus::CERTIFICATE_COMMIT_LATENCY, elapsed.as_secs_f64());
metrics::histogram(metrics::consensus::BLOCK_LATENCY, block_latency as f64);
metrics::gauge(metrics::blocks::PROOF_TARGET, proof_target as f64);
metrics::gauge(metrics::blocks::COINBASE_TARGET, coinbase_target as f64);
metrics::gauge(metrics::blocks::CUMULATIVE_PROOF_TARGET, cumulative_proof_target as f64);
}
Ok(())
}
async fn reinsert_transmissions(&self, transmissions: IndexMap<TransmissionID<N>, Transmission<N>>) {
for (transmission_id, transmission) in transmissions.into_iter() {
if let Err(e) = self.reinsert_transmission(transmission_id, transmission).await {
warn!(
"Unable to reinsert transmission {}.{} into the memory pool - {e}",
fmt_id(transmission_id),
fmt_id(transmission_id.checksum().unwrap_or_default()).dimmed()
);
}
}
}
async fn reinsert_transmission(
&self,
transmission_id: TransmissionID<N>,
transmission: Transmission<N>,
) -> Result<()> {
let (callback, callback_receiver) = oneshot::channel();
match (transmission_id, transmission) {
(TransmissionID::Ratification, Transmission::Ratification) => return Ok(()),
(TransmissionID::Solution(solution_id, _), Transmission::Solution(solution)) => {
self.primary_sender().tx_unconfirmed_solution.send((solution_id, solution, callback)).await?;
}
(TransmissionID::Transaction(transaction_id, _), Transmission::Transaction(transaction)) => {
self.primary_sender().tx_unconfirmed_transaction.send((transaction_id, transaction, callback)).await?;
}
_ => bail!("Mismatching `(transmission_id, transmission)` pair in consensus"),
}
callback_receiver.await?
}
fn spawn<T: Future<Output = ()> + Send + 'static>(&self, future: T) {
self.handles.lock().push(tokio::spawn(future));
}
pub async fn shut_down(&self) {
info!("Shutting down consensus...");
self.bft.shut_down().await;
self.handles.lock().iter().for_each(|handle| handle.abort());
}
}