use crate::{
block_storage::{
tracing::{observe_block, BlockStage},
BlockReader, BlockRetriever, BlockStore,
},
counters,
error::{error_kind, VerifyError},
liveness::{
proposal_generator::ProposalGenerator,
proposer_election::ProposerElection,
round_state::{NewRoundEvent, NewRoundReason, RoundState, RoundStateLogSchema},
unequivocal_proposer_election::UnequivocalProposerElection,
},
logging::{LogEvent, LogSchema},
metrics_safety_rules::MetricsSafetyRules,
monitor,
network::NetworkSender,
network_interface::ConsensusMsg,
pending_votes::VoteReceptionResult,
persistent_liveness_storage::PersistentLivenessStorage,
};
use anyhow::{bail, ensure, Context, Result};
use aptos_infallible::{checked, Mutex};
use aptos_logger::prelude::*;
use aptos_types::{
epoch_state::EpochState, on_chain_config::OnChainConsensusConfig,
validator_verifier::ValidatorVerifier,
};
use channel::aptos_channel;
use consensus_types::{
block::Block,
common::{Author, Round},
experimental::{commit_decision::CommitDecision, commit_vote::CommitVote},
proposal_msg::ProposalMsg,
quorum_cert::QuorumCert,
sync_info::SyncInfo,
timeout_2chain::TwoChainTimeoutCertificate,
vote::Vote,
vote_msg::VoteMsg,
};
use fail::fail_point;
use futures::{channel::oneshot, FutureExt, StreamExt};
#[cfg(test)]
use safety_rules::ConsensusState;
use safety_rules::TSafetyRules;
use serde::Serialize;
use std::{mem::Discriminant, sync::Arc, time::Duration};
use termion::color::*;
#[derive(Serialize, Clone)]
pub enum UnverifiedEvent {
ProposalMsg(Box<ProposalMsg>),
VoteMsg(Box<VoteMsg>),
SyncInfo(Box<SyncInfo>),
CommitVote(Box<CommitVote>),
CommitDecision(Box<CommitDecision>),
}
impl UnverifiedEvent {
pub fn verify(self, validator: &ValidatorVerifier) -> Result<VerifiedEvent, VerifyError> {
Ok(match self {
UnverifiedEvent::ProposalMsg(p) => {
p.verify(validator)?;
VerifiedEvent::ProposalMsg(p)
}
UnverifiedEvent::VoteMsg(v) => {
v.verify(validator)?;
VerifiedEvent::VoteMsg(v)
}
UnverifiedEvent::SyncInfo(s) => VerifiedEvent::UnverifiedSyncInfo(s),
UnverifiedEvent::CommitVote(cv) => {
cv.verify(validator)?;
VerifiedEvent::CommitVote(cv)
}
UnverifiedEvent::CommitDecision(cd) => {
cd.verify(validator)?;
VerifiedEvent::CommitDecision(cd)
}
})
}
pub fn epoch(&self) -> u64 {
match self {
UnverifiedEvent::ProposalMsg(p) => p.epoch(),
UnverifiedEvent::VoteMsg(v) => v.epoch(),
UnverifiedEvent::SyncInfo(s) => s.epoch(),
UnverifiedEvent::CommitVote(cv) => cv.epoch(),
UnverifiedEvent::CommitDecision(cd) => cd.epoch(),
}
}
}
impl From<ConsensusMsg> for UnverifiedEvent {
fn from(value: ConsensusMsg) -> Self {
match value {
ConsensusMsg::ProposalMsg(m) => UnverifiedEvent::ProposalMsg(m),
ConsensusMsg::VoteMsg(m) => UnverifiedEvent::VoteMsg(m),
ConsensusMsg::SyncInfo(m) => UnverifiedEvent::SyncInfo(m),
ConsensusMsg::CommitVoteMsg(m) => UnverifiedEvent::CommitVote(m),
ConsensusMsg::CommitDecisionMsg(m) => UnverifiedEvent::CommitDecision(m),
_ => unreachable!("Unexpected conversion"),
}
}
}
#[derive(Debug)]
pub enum VerifiedEvent {
ProposalMsg(Box<ProposalMsg>),
VoteMsg(Box<VoteMsg>),
UnverifiedSyncInfo(Box<SyncInfo>),
CommitVote(Box<CommitVote>),
CommitDecision(Box<CommitDecision>),
LocalTimeout(Round),
Shutdown(oneshot::Sender<()>),
}
#[cfg(test)]
#[path = "round_manager_test.rs"]
mod round_manager_test;
#[cfg(feature = "fuzzing")]
#[path = "round_manager_fuzzing.rs"]
pub mod round_manager_fuzzing;
pub struct RoundManager {
epoch_state: EpochState,
block_store: Arc<BlockStore>,
round_state: RoundState,
proposer_election: UnequivocalProposerElection,
proposal_generator: ProposalGenerator,
safety_rules: Arc<Mutex<MetricsSafetyRules>>,
network: NetworkSender,
storage: Arc<dyn PersistentLivenessStorage>,
sync_only: bool,
onchain_config: OnChainConsensusConfig,
}
impl RoundManager {
pub fn new(
epoch_state: EpochState,
block_store: Arc<BlockStore>,
round_state: RoundState,
proposer_election: Box<dyn ProposerElection + Send + Sync>,
proposal_generator: ProposalGenerator,
safety_rules: Arc<Mutex<MetricsSafetyRules>>,
network: NetworkSender,
storage: Arc<dyn PersistentLivenessStorage>,
sync_only: bool,
onchain_config: OnChainConsensusConfig,
) -> Self {
counters::OP_COUNTERS
.gauge("sync_only")
.set(sync_only as i64);
counters::OP_COUNTERS
.gauge("decoupled_execution")
.set(onchain_config.decoupled_execution() as i64);
Self {
epoch_state,
block_store,
round_state,
proposer_election: UnequivocalProposerElection::new(proposer_election),
proposal_generator,
safety_rules,
network,
storage,
sync_only,
onchain_config,
}
}
fn decoupled_execution(&self) -> bool {
self.onchain_config.decoupled_execution()
}
fn back_pressure_limit(&self) -> u64 {
self.onchain_config.back_pressure_limit()
}
fn create_block_retriever(&self, author: Author) -> BlockRetriever {
BlockRetriever::new(
self.network.clone(),
author,
self.epoch_state
.verifier
.get_ordered_account_addresses_iter()
.collect(),
)
}
async fn process_new_round_event(
&mut self,
new_round_event: NewRoundEvent,
) -> anyhow::Result<()> {
counters::CURRENT_ROUND.set(new_round_event.round as i64);
counters::ROUND_TIMEOUT_MS.set(new_round_event.timeout.as_millis() as i64);
match new_round_event.reason {
NewRoundReason::QCReady => {
counters::QC_ROUNDS_COUNT.inc();
}
NewRoundReason::Timeout => {
counters::TIMEOUT_ROUNDS_COUNT.inc();
}
};
info!(
self.new_log(LogEvent::NewRound),
reason = new_round_event.reason
);
if self
.proposer_election
.is_valid_proposer(self.proposal_generator.author(), new_round_event.round)
{
self.round_state.setup_leader_timeout();
let proposal_msg = self.generate_proposal(new_round_event).await?;
let mut network = self.network.clone();
#[cfg(feature = "failpoints")]
{
if self.check_whether_to_inject_reconfiguration_error() {
self.attempt_to_inject_reconfiguration_error(&proposal_msg)
.await?;
}
}
network.broadcast_proposal(proposal_msg).await;
counters::PROPOSALS_COUNT.inc();
}
Ok(())
}
async fn generate_proposal(
&mut self,
new_round_event: NewRoundEvent,
) -> anyhow::Result<ProposalMsg> {
let sync_info = self.block_store.sync_info();
let mut sender = self.network.clone();
let callback = async move {
sender.broadcast_sync_info(sync_info).await;
}
.boxed();
let proposal = self
.proposal_generator
.generate_proposal(new_round_event.round, &mut self.proposer_election, callback)
.await?;
let signature = self.safety_rules.lock().sign_proposal(&proposal)?;
let signed_proposal =
Block::new_proposal_from_block_data_and_signature(proposal, signature);
observe_block(signed_proposal.timestamp_usecs(), BlockStage::SIGNED);
info!(self.new_log(LogEvent::Propose), "{}", signed_proposal);
Ok(ProposalMsg::new(
signed_proposal,
self.block_store.sync_info(),
))
}
pub async fn process_proposal_msg(&mut self, proposal_msg: ProposalMsg) -> anyhow::Result<()> {
fail_point!("consensus::process_proposal_msg", |_| {
Err(anyhow::anyhow!("Injected error in process_proposal_msg"))
});
observe_block(
proposal_msg.proposal().timestamp_usecs(),
BlockStage::RECEIVED,
);
info!(
self.new_log(LogEvent::ReceiveProposal)
.remote_peer(proposal_msg.proposer()),
block_hash = proposal_msg.proposal().id(),
block_parent_hash = proposal_msg.proposal().quorum_cert().certified_block().id(),
);
if self
.ensure_round_and_sync_up(
proposal_msg.proposal().round(),
proposal_msg.sync_info(),
proposal_msg.proposer(),
)
.await
.context("[RoundManager] Process proposal")?
{
self.process_proposal(proposal_msg.take_proposal()).await
} else {
bail!(
"Stale proposal {}, current round {}",
proposal_msg.proposal(),
self.round_state.current_round()
);
}
}
async fn sync_up(&mut self, sync_info: &SyncInfo, author: Author) -> anyhow::Result<()> {
let local_sync_info = self.block_store.sync_info();
if sync_info.has_newer_certificates(&local_sync_info) {
info!(
self.new_log(LogEvent::ReceiveNewCertificate)
.remote_peer(author),
"Local state {}, remote state {}", local_sync_info, sync_info
);
sync_info
.verify(&self.epoch_state().verifier)
.map_err(|e| {
error!(
SecurityEvent::InvalidSyncInfoMsg,
sync_info = sync_info,
remote_peer = author,
error = ?e,
);
VerifyError::from(e)
})?;
let result = self
.block_store
.add_certs(sync_info, self.create_block_retriever(author))
.await;
self.process_certificates().await?;
result
} else {
Ok(())
}
}
pub async fn ensure_round_and_sync_up(
&mut self,
message_round: Round,
sync_info: &SyncInfo,
author: Author,
) -> anyhow::Result<bool> {
if message_round < self.round_state.current_round() {
return Ok(false);
}
self.sync_up(sync_info, author).await?;
ensure!(
message_round == self.round_state.current_round(),
"After sync, round {} doesn't match local {}",
message_round,
self.round_state.current_round()
);
Ok(true)
}
pub async fn process_sync_info_msg(
&mut self,
sync_info: SyncInfo,
peer: Author,
) -> anyhow::Result<()> {
fail_point!("consensus::process_sync_info_msg", |_| {
Err(anyhow::anyhow!("Injected error in process_sync_info_msg"))
});
info!(
self.new_log(LogEvent::ReceiveSyncInfo).remote_peer(peer),
"{}", sync_info
);
self.ensure_round_and_sync_up(checked!((sync_info.highest_round()) + 1)?, &sync_info, peer)
.await
.context("[RoundManager] Failed to process sync info msg")?;
Ok(())
}
fn sync_only(&self) -> bool {
if self.decoupled_execution() {
let commit_round = self.block_store.commit_root().round();
let ordered_round = self.block_store.ordered_root().round();
let sync_or_not =
self.sync_only || ordered_round > self.back_pressure_limit() + commit_round;
counters::OP_COUNTERS
.gauge("sync_only")
.set(sync_or_not as i64);
counters::OP_COUNTERS
.gauge("back_pressure")
.set((ordered_round - commit_round) as i64);
sync_or_not
} else {
self.sync_only
}
}
pub async fn process_local_timeout(&mut self, round: Round) -> anyhow::Result<()> {
if !self.round_state.process_local_timeout(round) {
return Ok(());
}
if self.sync_only() {
self.network
.broadcast_sync_info(self.block_store.sync_info())
.await;
bail!("[RoundManager] sync_only flag is set, broadcasting SyncInfo");
}
let (is_nil_vote, mut timeout_vote) = match self.round_state.vote_sent() {
Some(vote) if vote.vote_data().proposed().round() == round => {
(vote.vote_data().is_for_nil(), vote)
}
_ => {
let nil_block = self
.proposal_generator
.generate_nil_block(round, &mut self.proposer_election)?;
info!(
self.new_log(LogEvent::VoteNIL),
"Planning to vote for a NIL block {}", nil_block
);
counters::VOTE_NIL_COUNT.inc();
let nil_vote = self.execute_and_vote(nil_block).await?;
(true, nil_vote)
}
};
if !timeout_vote.is_timeout() {
let timeout = timeout_vote
.generate_2chain_timeout(self.block_store.highest_quorum_cert().as_ref().clone());
let signature = self
.safety_rules
.lock()
.sign_timeout_with_qc(
&timeout,
self.block_store.highest_2chain_timeout_cert().as_deref(),
)
.context("[RoundManager] SafetyRules signs 2-chain timeout")?;
timeout_vote.add_2chain_timeout(timeout, signature);
}
self.round_state.record_vote(timeout_vote.clone());
let timeout_vote_msg = VoteMsg::new(timeout_vote, self.block_store.sync_info());
self.network.broadcast_timeout_vote(timeout_vote_msg).await;
error!(
round = round,
remote_peer = self.proposer_election.get_valid_proposer(round),
voted_nil = is_nil_vote,
event = LogEvent::Timeout,
);
bail!("Round {} timeout, broadcast to all peers", round);
}
async fn process_certificates(&mut self) -> anyhow::Result<()> {
let sync_info = self.block_store.sync_info();
if let Some(new_round_event) = self.round_state.process_certificates(sync_info) {
self.process_new_round_event(new_round_event).await?;
}
Ok(())
}
async fn process_proposal(&mut self, proposal: Block) -> Result<()> {
let author = proposal
.author()
.expect("Proposal should be verified having an author");
ensure!(
self.proposer_election.is_valid_proposal(&proposal),
"[RoundManager] Proposer {} for block {} is not a valid proposer for this round or created duplicate proposal",
author,
proposal,
);
let expected_failed_authors = self.proposal_generator.compute_failed_authors(
proposal.round(),
proposal.quorum_cert().certified_block().round(),
false,
&mut self.proposer_election,
);
ensure!(
proposal.block_data().failed_authors().map_or(false, |failed_authors| *failed_authors == expected_failed_authors),
"[RoundManager] Proposal for block {} has invalid failed_authors list {:?}, expected {:?}",
proposal.round(),
proposal.block_data().failed_authors(),
expected_failed_authors,
);
let block_time_since_epoch = Duration::from_micros(proposal.timestamp_usecs());
ensure!(
block_time_since_epoch < self.round_state.current_round_deadline(),
"[RoundManager] Waiting until proposal block timestamp usecs {:?} \
would exceed the round duration {:?}, hence will not vote for this round",
block_time_since_epoch,
self.round_state.current_round_deadline(),
);
observe_block(proposal.timestamp_usecs(), BlockStage::SYNCED);
let proposal_round = proposal.round();
let vote = self
.execute_and_vote(proposal)
.await
.context("[RoundManager] Process proposal")?;
let recipient = self
.proposer_election
.get_valid_proposer(proposal_round + 1);
info!(
self.new_log(LogEvent::Vote).remote_peer(recipient),
"{}", vote
);
self.round_state.record_vote(vote.clone());
let vote_msg = VoteMsg::new(vote, self.block_store.sync_info());
self.network.send_vote(vote_msg, vec![recipient]).await;
Ok(())
}
async fn execute_and_vote(&mut self, proposed_block: Block) -> anyhow::Result<Vote> {
let executed_block = self
.block_store
.execute_and_insert_block(proposed_block)
.await
.context("[RoundManager] Failed to execute_and_insert the block")?;
ensure!(
self.round_state.vote_sent().is_none(),
"[RoundManager] Already vote on this round {}",
self.round_state.current_round()
);
ensure!(
!self.sync_only(),
"[RoundManager] sync_only flag is set, stop voting"
);
let vote_proposal = executed_block.vote_proposal(self.decoupled_execution());
let vote_result = self.safety_rules.lock().construct_and_sign_vote_two_chain(
&vote_proposal,
self.block_store.highest_2chain_timeout_cert().as_deref(),
);
let vote = vote_result.context(format!(
"[RoundManager] SafetyRules {}Rejected{} {}",
Fg(Red),
Fg(Reset),
executed_block.block()
))?;
if !executed_block.block().is_nil_block() {
observe_block(executed_block.block().timestamp_usecs(), BlockStage::VOTED);
}
self.storage
.save_vote(&vote)
.context("[RoundManager] Fail to persist last vote")?;
Ok(vote)
}
pub async fn process_vote_msg(&mut self, vote_msg: VoteMsg) -> anyhow::Result<()> {
fail_point!("consensus::process_vote_msg", |_| {
Err(anyhow::anyhow!("Injected error in process_vote_msg"))
});
if self
.ensure_round_and_sync_up(
vote_msg.vote().vote_data().proposed().round(),
vote_msg.sync_info(),
vote_msg.vote().author(),
)
.await
.context("[RoundManager] Stop processing vote")?
{
self.process_vote(vote_msg.vote())
.await
.context("[RoundManager] Add a new vote")?;
}
Ok(())
}
async fn process_vote(&mut self, vote: &Vote) -> anyhow::Result<()> {
let round = vote.vote_data().proposed().round();
info!(
self.new_log(LogEvent::ReceiveVote)
.remote_peer(vote.author()),
vote = %vote,
vote_epoch = vote.vote_data().proposed().epoch(),
vote_round = vote.vote_data().proposed().round(),
vote_id = vote.vote_data().proposed().id(),
vote_state = vote.vote_data().proposed().executed_state_id(),
is_timeout = vote.is_timeout(),
);
if !vote.is_timeout() {
let next_round = round + 1;
ensure!(
self.proposer_election
.is_valid_proposer(self.proposal_generator.author(), next_round),
"[RoundManager] Received {}, but I am not a valid proposer for round {}, ignore.",
vote,
next_round
);
}
let block_id = vote.vote_data().proposed().id();
if self
.block_store
.get_quorum_cert_for_block(block_id)
.is_some()
{
return Ok(());
}
match self
.round_state
.insert_vote(vote, &self.epoch_state.verifier)
{
VoteReceptionResult::NewQuorumCertificate(qc) => {
if !vote.is_timeout() {
observe_block(
qc.certified_block().timestamp_usecs(),
BlockStage::QC_AGGREGATED,
);
}
self.new_qc_aggregated(qc, vote.author()).await
}
VoteReceptionResult::New2ChainTimeoutCertificate(tc) => {
self.new_2chain_tc_aggregated(tc).await
}
VoteReceptionResult::EchoTimeout(_) if !self.round_state.is_vote_timeout() => {
self.process_local_timeout(round).await
}
_ => Ok(()),
}
}
async fn new_qc_aggregated(
&mut self,
qc: Arc<QuorumCert>,
preferred_peer: Author,
) -> anyhow::Result<()> {
let result = self
.block_store
.insert_quorum_cert(&qc, &mut self.create_block_retriever(preferred_peer))
.await
.context("[RoundManager] Failed to process a newly aggregated QC");
self.process_certificates().await?;
result
}
async fn new_2chain_tc_aggregated(
&mut self,
tc: Arc<TwoChainTimeoutCertificate>,
) -> anyhow::Result<()> {
let result = self
.block_store
.insert_2chain_timeout_certificate(tc)
.context("[RoundManager] Failed to process a newly aggregated 2-chain TC");
self.process_certificates().await?;
result
}
pub async fn init(&mut self, last_vote_sent: Option<Vote>) {
let new_round_event = self
.round_state
.process_certificates(self.block_store.sync_info())
.expect("Can not jump start a round_state from existing certificates.");
if let Some(vote) = last_vote_sent {
self.round_state.record_vote(vote);
}
if let Err(e) = self.process_new_round_event(new_round_event).await {
error!(error = ?e, "[RoundManager] Error during start");
}
}
#[cfg(test)]
pub fn consensus_state(&mut self) -> ConsensusState {
self.safety_rules.lock().consensus_state().unwrap()
}
#[cfg(test)]
pub fn set_safety_rules(&mut self, safety_rules: Arc<Mutex<MetricsSafetyRules>>) {
self.safety_rules = safety_rules
}
pub fn epoch_state(&self) -> &EpochState {
&self.epoch_state
}
pub fn round_state(&self) -> &RoundState {
&self.round_state
}
fn new_log(&self, event: LogEvent) -> LogSchema {
LogSchema::new(event)
.round(self.round_state.current_round())
.epoch(self.epoch_state.epoch)
}
pub async fn start(
mut self,
mut event_rx: aptos_channel::Receiver<
(Author, Discriminant<VerifiedEvent>),
(Author, VerifiedEvent),
>,
) {
info!(epoch = self.epoch_state().epoch, "RoundManager started");
while let Some((peer_id, event)) = event_rx.next().await {
let result = match event {
VerifiedEvent::ProposalMsg(proposal_msg) => {
monitor!(
"process_proposal",
self.process_proposal_msg(*proposal_msg).await
)
}
VerifiedEvent::VoteMsg(vote_msg) => {
monitor!("process_vote", self.process_vote_msg(*vote_msg).await)
}
VerifiedEvent::UnverifiedSyncInfo(sync_info) => {
monitor!(
"process_sync_info",
self.process_sync_info_msg(*sync_info, peer_id).await
)
}
VerifiedEvent::LocalTimeout(round) => monitor!(
"process_local_timeout",
self.process_local_timeout(round).await
),
VerifiedEvent::Shutdown(ack_sender) => {
ack_sender
.send(())
.expect("[RoundManager] Fail to ack shutdown");
break;
}
unexpected_event => unreachable!("Unexpected event: {:?}", unexpected_event),
}
.with_context(|| format!("from peer {}", peer_id));
let round_state = self.round_state();
match result {
Ok(_) => trace!(RoundStateLogSchema::new(round_state)),
Err(e) => {
counters::ERROR_COUNT.inc();
error!(error = ?e, kind = error_kind(&e), RoundStateLogSchema::new(round_state));
}
}
}
info!(epoch = self.epoch_state().epoch, "RoundManager stopped");
}
#[cfg(feature = "failpoints")]
fn check_whether_to_inject_reconfiguration_error(&self) -> bool {
fail_point!("consensus::inject_reconfiguration_error", |_| true);
false
}
#[cfg(feature = "failpoints")]
async fn attempt_to_inject_reconfiguration_error(
&self,
proposal_msg: &ProposalMsg,
) -> anyhow::Result<()> {
let block_data = proposal_msg.proposal().block_data();
let direct_suffix = block_data.is_reconfiguration_suffix()
&& !block_data
.quorum_cert()
.parent_block()
.has_reconfiguration();
let continuous_round =
block_data.round() == block_data.quorum_cert().certified_block().round() + 1;
let should_inject = direct_suffix && continuous_round;
if should_inject {
let mut half_peers: Vec<_> = self
.epoch_state
.verifier
.get_ordered_account_addresses_iter()
.collect();
half_peers.truncate(half_peers.len() / 2);
self.network
.clone()
.send_proposal(proposal_msg.clone(), half_peers)
.await;
Err(anyhow::anyhow!("Injected error in reconfiguration suffix"))
} else {
Ok(())
}
}
}