use std::sync::Arc;
use async_trait::async_trait;
use node_data::StepName;
use node_data::bls::PublicKeyBytes;
use node_data::ledger::{Block, StepVotes, to_str};
use node_data::message::payload::{
GetResource, Inv, QuorumType, Validation, Vote,
};
use node_data::message::{
ConsensusHeader, Message, Payload, SignedStepMessage, StepMessage, payload,
};
use tokio::sync::Mutex;
use tracing::{debug, info, warn};
use crate::aggregator::{Aggregator, StepVote};
use crate::commons::{Database, RoundUpdate};
use crate::config::is_emergency_iter;
use crate::errors::ConsensusError;
use crate::iteration_ctx::RoundCommittees;
use crate::msg_handler::{MsgHandler, StepOutcome};
use crate::step_votes_reg::SafeAttestationInfoRegistry;
use crate::user::committee::Committee;
pub struct ValidationHandler<D: Database> {
pub(crate) aggr: Aggregator<Validation>,
pub(crate) candidate: Option<Block>,
att_registry: SafeAttestationInfoRegistry,
curr_iteration: u8,
pub(crate) db: Arc<Mutex<D>>,
}
impl StepVote for Validation {
fn vote(&self) -> &Vote {
&self.vote
}
}
pub fn verify_stateless(
msg: &Message,
round_committees: &RoundCommittees,
) -> Result<(), ConsensusError> {
match &msg.payload {
Payload::Validation(p) => {
p.verify_signature()?;
let signer = &p.sign_info.signer;
let committee = round_committees
.get_committee(msg.get_step())
.expect("committee to be created before run");
committee
.votes_for(signer)
.ok_or(ConsensusError::NotCommitteeMember)?;
}
Payload::Empty => (),
_ => {
info!("cannot verify in validation handler");
Err(ConsensusError::InvalidMsgType)?
}
}
Ok(())
}
impl<D: Database> ValidationHandler<D> {
pub(crate) fn new(
att_registry: SafeAttestationInfoRegistry,
db: Arc<Mutex<D>>,
) -> Self {
Self {
att_registry,
aggr: Aggregator::default(),
candidate: None,
curr_iteration: 0,
db,
}
}
pub(crate) fn reset(&mut self, curr_iteration: u8) {
self.candidate = None;
self.curr_iteration = curr_iteration;
}
fn unwrap_msg(msg: Message) -> Result<Validation, ConsensusError> {
match msg.payload {
Payload::Validation(r) => Ok(r),
_ => Err(ConsensusError::InvalidMsgType),
}
}
async fn build_validation_result(
&self,
step_votes: StepVotes,
vote: Vote,
quorum: QuorumType,
consensus_header: &ConsensusHeader,
) -> Message {
let vr = payload::ValidationResult::new(step_votes, vote, quorum);
if is_emergency_iter(consensus_header.iteration) {
debug!(
event = "Store ValidationResult",
info = ?consensus_header,
src = "Validation"
);
self.db
.lock()
.await
.store_validation_result(consensus_header, &vr)
.await;
}
Message::from(vr)
}
}
#[async_trait]
impl<D: Database> MsgHandler for ValidationHandler<D> {
fn verify(
&self,
msg: &Message,
_round_committees: &RoundCommittees,
) -> Result<(), ConsensusError> {
match &msg.payload {
Payload::Validation(p) => {
if self.aggr.is_vote_collected(p) {
return Err(ConsensusError::VoteAlreadyCollected);
}
p.verify_signature()?
}
Payload::Empty => (),
_ => Err(ConsensusError::InvalidMsgType)?,
};
Ok(())
}
async fn collect(
&mut self,
msg: Message,
_ru: &RoundUpdate,
committee: &Committee,
generator: Option<PublicKeyBytes>,
_round_committees: &RoundCommittees,
) -> Result<StepOutcome, ConsensusError> {
let p = Self::unwrap_msg(msg)?;
if p.vote == Vote::NoQuorum {
return Err(ConsensusError::InvalidVote(p.vote));
}
let iteration = p.header().iteration;
if iteration != self.curr_iteration {
return Err(ConsensusError::InvalidMsgIteration(iteration));
}
let (step_votes, quorum_reached) =
self.aggr.collect_vote(committee, &p).map_err(|error| {
warn!(
event = "Cannot collect vote",
?error,
from = p.sign_info().signer.to_bs58(),
vote = ?p.vote,
msg_step = p.get_step(),
msg_iter = p.header().iteration,
msg_height = p.header().round,
);
ConsensusError::InvalidVote(p.vote)
})?;
_ = self.att_registry.lock().await.set_step_votes(
iteration,
&p.vote,
step_votes,
StepName::Validation,
quorum_reached,
&generator.expect("There must be a valid generator"),
);
if quorum_reached {
let vote = p.vote;
let quorum_type = match vote {
Vote::NoCandidate => QuorumType::NoCandidate,
Vote::Invalid(_) => QuorumType::Invalid,
Vote::Valid(_) => QuorumType::Valid,
Vote::NoQuorum => {
return Err(ConsensusError::InvalidVote(vote));
}
};
let vrmsg = self
.build_validation_result(
step_votes,
vote,
quorum_type,
&p.header(),
)
.await;
return Ok(StepOutcome::Ready(vrmsg));
}
Ok(StepOutcome::Pending)
}
async fn collect_from_past(
&mut self,
msg: Message,
committee: &Committee,
generator: Option<PublicKeyBytes>,
) -> Result<StepOutcome, ConsensusError> {
if is_emergency_iter(msg.header.iteration) {
if let Payload::ValidationQuorum(vq) = msg.payload {
if !vq.result.vote().is_valid() {
return Err(ConsensusError::InvalidMsgType);
};
let vr = vq.result;
debug!(
event = "Store ValidationResult",
info = ?vq.header,
src = "ValidationQuorum"
);
self.db
.lock()
.await
.store_validation_result(&vq.header, &vr)
.await;
let vr_msg = vr.into();
return Ok(StepOutcome::Ready(vr_msg));
}
}
let p = Self::unwrap_msg(msg)?;
if p.vote == Vote::NoQuorum {
return Err(ConsensusError::InvalidVote(p.vote));
}
let collect_vote = self.aggr.collect_vote(committee, &p);
match collect_vote {
Ok((step_votes, validation_quorum_reached)) => {
let _ = self.att_registry.lock().await.set_step_votes(
p.header().iteration,
&p.vote,
step_votes,
StepName::Validation,
validation_quorum_reached,
&generator.expect("There must be a valid generator"),
);
if p.vote.is_valid() && validation_quorum_reached {
let vr = self
.build_validation_result(
step_votes,
p.vote,
QuorumType::Valid,
&p.header(),
)
.await;
return Ok(StepOutcome::Ready(vr));
}
}
Err(error) => {
warn!(
event = "Cannot collect vote",
?error,
from = p.sign_info().signer.to_bs58(),
vote = ?p.vote,
msg_step = p.get_step(),
msg_iter = p.header().iteration,
msg_height = p.header().round,
);
}
}
Ok(StepOutcome::Pending)
}
fn handle_timeout(
&self,
ru: &RoundUpdate,
curr_iteration: u8,
) -> Option<Message> {
if is_emergency_iter(curr_iteration) {
let prev_block_hash = ru.hash();
let round = ru.round;
debug!(
event = "Request ValidationResult",
round,
iteration = curr_iteration,
prev_block = to_str(&prev_block_hash)
);
let mut inv = Inv::new(1);
inv.add_validation_result(ConsensusHeader {
prev_block_hash,
round,
iteration: curr_iteration,
});
let msg = GetResource::new(inv, None, u64::MAX, 0);
return Some(msg.into());
}
None
}
}