use std::cmp;
use std::sync::Arc;
use std::time::Duration;
use node_data::get_current_timestamp;
use node_data::ledger::IterationsInfo;
use node_data::message::Message;
use tokio::sync::Mutex;
use tracing::{debug, error, warn};
use crate::commons::Database;
use crate::config;
use crate::config::MINIMUM_BLOCK_TIME;
use crate::errors::{OperationError, StateTransitionError};
use crate::execution_ctx::ExecutionCtx;
use crate::msg_handler::{MsgHandler, StepOutcome};
use crate::operations::Operations;
use crate::proposal::block_generator::Generator;
use crate::proposal::handler;
pub struct ProposalStep<T, D: Database>
where
T: Operations,
{
handler: Arc<Mutex<handler::ProposalHandler<D>>>,
bg: Generator<T>,
}
impl<T: Operations + 'static, D: Database> ProposalStep<T, D> {
pub fn new(
executor: Arc<T>,
_db: Arc<Mutex<D>>,
handler: Arc<Mutex<handler::ProposalHandler<D>>>,
) -> Self {
Self {
handler,
bg: Generator::new(executor),
}
}
pub async fn reinitialize(
&mut self,
_msg: Message,
round: u64,
iteration: u8,
) {
debug!(event = "init", name = self.name(), round, iter = iteration,)
}
pub async fn run(&mut self, mut ctx: ExecutionCtx<'_, T, D>) -> Message {
let committee = ctx
.get_current_committee()
.expect("committee to be created before run");
let tip_timestamp = ctx.round_update.timestamp();
if ctx.am_member(committee) {
let iteration =
cmp::min(config::RELAX_ITERATION_THRESHOLD, ctx.iteration);
let failed_attestations =
ctx.att_registry.lock().await.get_failed_atts(iteration);
match self
.bg
.generate_candidate_message(
&ctx.round_update,
ctx.iteration,
IterationsInfo::new(failed_attestations),
)
.await
{
Ok(msg) => {
debug!(
event = "send message",
src = "proposal",
msg_topic = ?msg.topic(),
info = ?msg.header,
ray_id = msg.ray_id()
);
ctx.outbound.try_send(msg.clone());
match self
.handler
.lock()
.await
.collect(
msg,
&ctx.round_update,
committee,
None,
&ctx.iter_ctx.committees,
)
.await
{
Ok(StepOutcome::Ready(msg)) => {
Self::wait_until_next_slot(tip_timestamp).await;
return msg;
}
Err(err) => {
error!(event = "Failed to store candidate", ?err)
}
_ => {}
};
}
Err(err) => match err {
OperationError::FailedTransitionCreation(
StateTransitionError::TipChanged,
) => {
warn!(
event = "Stopped block generation",
round = ctx.round_update.round,
iteration = ctx.iteration,
%err,
);
}
_ => {
error!(
event = "Failed to generate candidate block",
round = ctx.round_update.round,
iteration = ctx.iteration,
%err,
)
}
},
}
}
let additional_timeout = Self::next_slot_in(tip_timestamp);
let msg = match ctx.handle_future_msgs(self.handler.clone()).await {
StepOutcome::Ready(m) => m,
StepOutcome::Pending => {
ctx.event_loop(self.handler.clone(), additional_timeout)
.await
}
};
Self::wait_until_next_slot(tip_timestamp).await;
msg
}
async fn wait_until_next_slot(tip_timestamp: u64) {
if let Some(delay) = Self::next_slot_in(tip_timestamp) {
debug!(event = "Wait next block slot for validation", ?delay);
tokio::time::sleep(delay).await;
}
}
fn next_slot_in(tip_timestamp: u64) -> Option<Duration> {
let current_time_secs = get_current_timestamp();
let next_slot_timestamp = tip_timestamp + *MINIMUM_BLOCK_TIME;
if current_time_secs >= next_slot_timestamp {
None
} else {
Some(Duration::from_secs(next_slot_timestamp - current_time_secs))
}
}
pub fn name(&self) -> &'static str {
"proposal"
}
}