use std::cmp;
use std::collections::HashMap;
use std::ops::Add;
use std::sync::Arc;
use std::time::Duration;
use node_data::StepName;
use node_data::bls::PublicKeyBytes;
use node_data::ledger::Seed;
use node_data::message::{Message, Topics};
use tokio::sync::Mutex;
use tokio::task::JoinSet;
use tracing::debug;
use crate::commons::{Database, TimeoutSet};
use crate::config::{
MAX_STEP_TIMEOUT, TIMEOUT_INCREASE, exclude_next_generator,
};
use crate::msg_handler::{MsgHandler, StepOutcome};
use crate::user::committee::Committee;
use crate::user::provisioners::Provisioners;
use crate::user::sortition;
use crate::{proposal, ratification, validation};
#[derive(Default)]
pub struct RoundCommittees {
committees: HashMap<u8, Committee>,
}
impl RoundCommittees {
pub(crate) fn get_committee(&self, step: u8) -> Option<&Committee> {
self.committees.get(&step)
}
pub(crate) fn get_generator(&self, iter: u8) -> Option<PublicKeyBytes> {
let step = StepName::Proposal.to_step(iter);
self.get_committee(step)
.and_then(|c| c.iter().next().map(|p| *p.bytes()))
}
pub(crate) fn get_validation_committee(
&self,
iter: u8,
) -> Option<&Committee> {
let step = StepName::Validation.to_step(iter);
self.get_committee(step)
}
pub(crate) fn insert(&mut self, step: u8, committee: Committee) {
self.committees.insert(step, committee);
}
}
pub struct IterationCtx<DB: Database> {
validation_handler: Arc<Mutex<validation::handler::ValidationHandler<DB>>>,
ratification_handler:
Arc<Mutex<ratification::handler::RatificationHandler>>,
proposal_handler: Arc<Mutex<proposal::handler::ProposalHandler<DB>>>,
pub join_set: JoinSet<()>,
round: u64,
iter: u8,
pub(crate) committees: RoundCommittees,
timeouts: TimeoutSet,
}
impl<DB: Database> IterationCtx<DB> {
pub fn new(
round: u64,
iter: u8,
validation_handler: Arc<
Mutex<validation::handler::ValidationHandler<DB>>,
>,
ratification_handler: Arc<
Mutex<ratification::handler::RatificationHandler>,
>,
proposal_handler: Arc<Mutex<proposal::handler::ProposalHandler<DB>>>,
timeouts: TimeoutSet,
) -> Self {
Self {
round,
join_set: JoinSet::new(),
iter,
validation_handler,
ratification_handler,
committees: Default::default(),
timeouts,
proposal_handler,
}
}
pub(crate) fn on_begin(&mut self, iter: u8) {
self.iter = iter;
}
pub(crate) fn on_close(&mut self) {
debug!(
event = "iter completed",
len = self.join_set.len(),
round = self.round,
iter = self.iter,
);
self.join_set.abort_all();
}
pub(crate) fn on_timeout_event(&mut self, step_name: StepName) {
let curr_step_timeout =
self.timeouts.get_mut(&step_name).expect("valid timeout");
*curr_step_timeout =
cmp::min(MAX_STEP_TIMEOUT, curr_step_timeout.add(TIMEOUT_INCREASE));
}
pub(crate) fn get_timeout(&self, step_name: StepName) -> Duration {
*self
.timeouts
.get(&step_name)
.expect("valid timeout per step")
}
fn get_sortition_config(
&self,
seed: Seed,
step_name: StepName,
exclusion: Vec<PublicKeyBytes>,
) -> sortition::Config {
sortition::Config::new(
seed, self.round, self.iter, step_name, exclusion,
)
}
pub(crate) fn generate_committee(
&mut self,
iteration: u8,
step_name: StepName,
provisioners: &Provisioners,
seed: Seed,
) {
let step = step_name.to_step(iteration);
if self.committees.get_committee(step).is_some() {
return;
}
if step_name != StepName::Proposal && exclude_next_generator(iteration)
{
let prop = StepName::Proposal;
let next_prop_step = prop.to_step(iteration + 1);
if self.committees.get_committee(next_prop_step).is_none() {
let mut next_cfg =
self.get_sortition_config(seed, prop, vec![]);
next_cfg.step = next_prop_step;
let next_generator = Committee::new(provisioners, &next_cfg);
debug!(
event = "committee_generated",
step = next_cfg.step,
config = ?next_cfg,
members = format!("{}", &next_generator)
);
self.committees.insert(next_prop_step, next_generator);
}
}
let exclusion = match step_name {
StepName::Proposal => vec![],
_ => {
let mut exclusion_list = vec![];
let cur_generator = self
.get_generator(iteration)
.expect("Proposal committee to be already generated");
exclusion_list.push(cur_generator);
if exclude_next_generator(iteration) {
let next_generator =
self.get_generator(iteration + 1).expect(
"Next Proposal committee to be already generated",
);
exclusion_list.push(next_generator);
}
exclusion_list
}
};
let sortition_step = step_name.to_step(iteration);
let mut config_step =
self.get_sortition_config(seed, step_name, exclusion);
config_step.step = sortition_step;
let step_committee = Committee::new(provisioners, &config_step);
debug!(
event = "committee_generated",
step = config_step.step,
config = ?config_step,
members = format!("{}", &step_committee)
);
self.committees.insert(step, step_committee);
}
pub(crate) fn generate_iteration_committees(
&mut self,
iteration: u8,
provisioners: &Provisioners,
seed: Seed,
) {
let stepnames = [
StepName::Proposal,
StepName::Validation,
StepName::Ratification,
];
for stepname in &stepnames {
self.generate_committee(iteration, *stepname, provisioners, seed);
}
}
pub(crate) fn get_generator(&self, iter: u8) -> Option<PublicKeyBytes> {
let step = StepName::Proposal.to_step(iter);
self.committees
.get_committee(step)
.and_then(|c| c.iter().next())
.map(|p| *p.bytes())
}
pub(crate) async fn process_past_msg(
&self,
msg: Message,
) -> Option<Message> {
let committee = self.committees.get_committee(msg.get_step())?;
let generator = self.get_generator(msg.header.iteration);
match msg.topic() {
Topics::Candidate => {
let mut proposal = self.proposal_handler.lock().await;
if let Ok(StepOutcome::Ready(m)) =
proposal.collect_from_past(msg, committee, generator).await
{
return Some(m);
}
}
Topics::Validation | Topics::ValidationQuorum => {
let mut validation = self.validation_handler.lock().await;
if let Ok(StepOutcome::Ready(m)) = validation
.collect_from_past(msg, committee, generator)
.await
{
return Some(m);
}
}
Topics::Ratification => {
let mut ratification = self.ratification_handler.lock().await;
if let Ok(StepOutcome::Ready(m)) = ratification
.collect_from_past(msg, committee, generator)
.await
{
return Some(m);
}
}
_ => {}
};
None
}
}
impl<DB: Database> Drop for IterationCtx<DB> {
fn drop(&mut self) {
self.on_close();
}
}