use async_trait::async_trait;
use node_data::StepName;
use node_data::bls::PublicKeyBytes;
use node_data::ledger::Attestation;
use node_data::message::payload::{
Quorum, Ratification, ValidationResult, Vote,
};
use node_data::message::{
ConsensusHeader, Message, Payload, SignedStepMessage, StepMessage,
};
use tracing::{debug, error, info, warn};
use crate::aggregator::{Aggregator, StepVote};
use crate::commons::RoundUpdate;
use crate::config::is_emergency_iter;
use crate::errors::ConsensusError;
use crate::iteration_ctx::RoundCommittees;
use crate::msg_handler::{MsgHandler, StepOutcome};
use crate::quorum::verifiers::verify_quorum_votes;
use crate::step_votes_reg::SafeAttestationInfoRegistry;
use crate::user::committee::Committee;
pub struct RatificationHandler {
pub(crate) att_registry: SafeAttestationInfoRegistry,
pub(crate) aggregator: Aggregator<Ratification>,
validation_result: ValidationResult,
pub(crate) curr_iteration: u8,
}
impl StepVote for Ratification {
fn vote(&self) -> &Vote {
&self.vote
}
}
impl RatificationHandler {
pub fn verify_stateless(
msg: &Message,
round_committees: &RoundCommittees,
) -> Result<(), ConsensusError> {
match &msg.payload {
Payload::Ratification(p) => {
p.verify_signature()?;
let signer = &p.sign_info.signer;
let committee = round_committees
.get_committee(msg.get_step())
.expect("committee to be created before run");
committee
.votes_for(signer)
.ok_or(ConsensusError::NotCommitteeMember)?;
}
Payload::ValidationQuorum(q) => Self::verify_validation_result(
&q.header,
&q.result,
round_committees,
)?,
Payload::Empty => (),
_ => {
info!("cannot verify in validation handler");
Err(ConsensusError::InvalidMsgType)?
}
}
Ok(())
}
}
#[async_trait]
impl MsgHandler for RatificationHandler {
fn verify(
&self,
msg: &Message,
_round_committees: &RoundCommittees,
) -> Result<(), ConsensusError> {
if let Payload::Ratification(p) = &msg.payload {
if self.aggregator.is_vote_collected(p) {
return Err(ConsensusError::VoteAlreadyCollected);
}
p.verify_signature()?;
return Ok(());
}
Err(ConsensusError::InvalidMsgType)
}
async fn collect(
&mut self,
msg: Message,
ru: &RoundUpdate,
committee: &Committee,
generator: Option<PublicKeyBytes>,
round_committees: &RoundCommittees,
) -> Result<StepOutcome, ConsensusError> {
let p = Self::unwrap_msg(msg)?;
let vote = p.vote;
let iteration = p.header().iteration;
if iteration != self.curr_iteration {
return Err(ConsensusError::InvalidMsgIteration(iteration));
}
if let Err(err) = Self::verify_msg_validation_result(&p) {
warn!(
event = "Vote discarded",
step = "Ratification",
reason = "mismatch with msg ValidationResult",
round = ru.round,
iter = iteration
);
return Err(err);
}
if vote != Vote::NoQuorum {
let local_vote = *self.validation_result().vote();
match local_vote {
Vote::NoQuorum => {
Self::verify_validation_result(
&p.header,
&p.validation_result,
round_committees,
)?;
self.update_validation_result(p.validation_result.clone())
}
_ => {
if vote != local_vote {
if !is_emergency_iter(iteration) {
warn!(
event = "Vote discarded",
step = "Ratification",
reason = "mismatch with local ValidationResult",
round = ru.round,
iter = iteration
);
return Err(ConsensusError::VoteMismatch(
local_vote, vote,
));
} else {
Self::verify_validation_result(
&p.header,
&p.validation_result,
round_committees,
)?;
self.update_validation_result(
p.validation_result.clone(),
)
}
}
}
}
}
let (step_votes, quorum_reached) = self
.aggregator
.collect_vote(committee, &p)
.map_err(|error| {
warn!(
event = "Cannot collect vote",
?error,
from = p.sign_info().signer.to_bs58(),
?vote,
msg_step = p.get_step(),
msg_iter = p.header().iteration,
msg_height = p.header().round,
);
ConsensusError::InvalidVote(vote)
})?;
if let Some(attestation) =
self.att_registry.lock().await.set_step_votes(
iteration,
&vote,
step_votes,
StepName::Ratification,
quorum_reached,
&generator.expect("There must be a valid generator"),
)
{
let ch = ConsensusHeader {
prev_block_hash: ru.hash(),
round: ru.round,
iteration,
};
let qmsg = Self::build_quorum(ch, attestation);
return Ok(StepOutcome::Ready(qmsg));
}
Ok(StepOutcome::Pending)
}
async fn collect_from_past(
&mut self,
msg: Message,
committee: &Committee,
generator: Option<PublicKeyBytes>,
) -> Result<StepOutcome, ConsensusError> {
let p = Self::unwrap_msg(msg)?;
if let Err(err) = Self::verify_msg_validation_result(&p) {
warn!(
event = "Vote discarded",
step = "Ratification",
reason = "mismatch with msg ValidationResult",
round = p.header.round,
iter = p.header.iteration
);
return Err(err);
}
let collect_vote = self.aggregator.collect_vote(committee, &p);
match collect_vote {
Ok((step_votes, quorum_reached)) => {
if let Some(attestation) =
self.att_registry.lock().await.set_step_votes(
p.header().iteration,
&p.vote,
step_votes,
StepName::Ratification,
quorum_reached,
&generator.expect("There must be a valid generator"),
)
{
let qmsg = Self::build_quorum(p.header(), attestation);
return Ok(StepOutcome::Ready(qmsg));
}
}
Err(error) => {
warn!(
event = "Cannot collect vote",
?error,
from = p.sign_info().signer.to_bs58(),
vote = ?p.vote,
msg_step = p.get_step(),
msg_iter = p.header().iteration,
msg_height = p.header().round,
);
}
};
Ok(StepOutcome::Pending)
}
fn handle_timeout(
&self,
_ru: &RoundUpdate,
_curr_iteration: u8,
) -> Option<Message> {
None
}
}
impl RatificationHandler {
pub(crate) fn new(att_registry: SafeAttestationInfoRegistry) -> Self {
Self {
att_registry,
aggregator: Default::default(),
validation_result: Default::default(),
curr_iteration: 0,
}
}
fn build_quorum(header: ConsensusHeader, att: Attestation) -> Message {
let payload = Quorum { header, att };
payload.into()
}
pub(crate) fn reset(&mut self, iter: u8, validation: ValidationResult) {
self.validation_result = validation;
self.curr_iteration = iter;
}
pub(crate) fn validation_result(&self) -> &ValidationResult {
&self.validation_result
}
pub(crate) fn update_validation_result(&mut self, vr: ValidationResult) {
let cur_vote = *self.validation_result().vote();
let new_vote = vr.vote();
debug!(
"Update local ValidationResult ({:?}) with {:?}",
cur_vote, new_vote
);
self.validation_result = vr;
}
fn verify_msg_validation_result(
msg: &Ratification,
) -> Result<(), ConsensusError> {
let vote = msg.vote;
let vr_vote = *msg.validation_result.vote();
if vr_vote != vote {
return Err(ConsensusError::VoteMismatch(vr_vote, vote));
}
if vote == Vote::NoQuorum
&& !msg.validation_result.step_votes().is_empty()
{
return Err(ConsensusError::InvalidVote(vote));
}
Ok(())
}
fn unwrap_msg(msg: Message) -> Result<Ratification, ConsensusError> {
match msg.payload {
Payload::Ratification(r) => Ok(r),
_ => Err(ConsensusError::InvalidMsgType),
}
}
pub(crate) fn verify_validation_result(
header: &ConsensusHeader,
result: &ValidationResult,
round_committees: &RoundCommittees,
) -> Result<(), ConsensusError> {
let iter = header.iteration;
let validation_committee = round_committees
.get_validation_committee(iter)
.ok_or_else(|| {
error!("could not get validation committee");
ConsensusError::CommitteeNotGenerated
})?;
verify_quorum_votes(
header,
StepName::Validation,
result.vote(),
result.step_votes(),
validation_committee,
)?;
Ok(())
}
}