dusk_consensus/proposal/
step.rs1use std::cmp;
8use std::sync::Arc;
9use std::time::Duration;
10
11use node_data::get_current_timestamp;
12use node_data::ledger::IterationsInfo;
13use node_data::message::Message;
14use tokio::sync::Mutex;
15use tracing::{debug, error, warn};
16
17use crate::commons::Database;
18use crate::config;
19use crate::config::MINIMUM_BLOCK_TIME;
20use crate::errors::{OperationError, StateTransitionError};
21use crate::execution_ctx::ExecutionCtx;
22use crate::msg_handler::{MsgHandler, StepOutcome};
23use crate::operations::Operations;
24use crate::proposal::block_generator::Generator;
25use crate::proposal::handler;
26
27pub struct ProposalStep<T, D: Database>
28where
29 T: Operations,
30{
31 handler: Arc<Mutex<handler::ProposalHandler<D>>>,
32 bg: Generator<T>,
33}
34
35impl<T: Operations + 'static, D: Database> ProposalStep<T, D> {
36 pub fn new(
37 executor: Arc<T>,
38 _db: Arc<Mutex<D>>,
39 handler: Arc<Mutex<handler::ProposalHandler<D>>>,
40 ) -> Self {
41 Self {
42 handler,
43 bg: Generator::new(executor),
44 }
45 }
46
47 pub async fn reinitialize(
48 &mut self,
49 _msg: Message,
50 round: u64,
51 iteration: u8,
52 ) {
53 debug!(event = "init", name = self.name(), round, iter = iteration,)
54 }
55
56 pub async fn run(&mut self, mut ctx: ExecutionCtx<'_, T, D>) -> Message {
57 let committee = ctx
58 .get_current_committee()
59 .expect("committee to be created before run");
60
61 let tip_timestamp = ctx.round_update.timestamp();
62
63 if ctx.am_member(committee) {
64 let iteration =
65 cmp::min(config::RELAX_ITERATION_THRESHOLD, ctx.iteration);
66
67 let failed_attestations =
69 ctx.att_registry.lock().await.get_failed_atts(iteration);
70
71 match self
72 .bg
73 .generate_candidate_message(
74 &ctx.round_update,
75 ctx.iteration,
76 IterationsInfo::new(failed_attestations),
77 )
78 .await
79 {
80 Ok(msg) => {
81 debug!(
82 event = "send message",
83 src = "proposal",
84 msg_topic = ?msg.topic(),
85 info = ?msg.header,
86 ray_id = msg.ray_id()
87 );
88 ctx.outbound.try_send(msg.clone());
89
90 match self
92 .handler
93 .lock()
94 .await
95 .collect(
96 msg,
97 &ctx.round_update,
98 committee,
99 None,
100 &ctx.iter_ctx.committees,
101 )
102 .await
103 {
104 Ok(StepOutcome::Ready(msg)) => {
105 Self::wait_until_next_slot(tip_timestamp).await;
106 return msg;
107 }
108 Err(err) => {
109 error!(event = "Failed to store candidate", ?err)
110 }
111 _ => {}
112 };
113 }
114
115 Err(err) => match err {
116 OperationError::FailedTransitionCreation(
117 StateTransitionError::TipChanged,
118 ) => {
119 warn!(
120 event = "Stopped block generation",
121 round = ctx.round_update.round,
122 iteration = ctx.iteration,
123 %err,
124 );
125 }
126 _ => {
127 error!(
128 event = "Failed to generate candidate block",
129 round = ctx.round_update.round,
130 iteration = ctx.iteration,
131 %err,
132 )
133 }
134 },
135 }
136 }
137
138 let additional_timeout = Self::next_slot_in(tip_timestamp);
139 let msg = match ctx.handle_future_msgs(self.handler.clone()).await {
140 StepOutcome::Ready(m) => m,
141 StepOutcome::Pending => {
142 ctx.event_loop(self.handler.clone(), additional_timeout)
143 .await
144 }
145 };
146 Self::wait_until_next_slot(tip_timestamp).await;
147 msg
148 }
149
150 async fn wait_until_next_slot(tip_timestamp: u64) {
152 if let Some(delay) = Self::next_slot_in(tip_timestamp) {
153 debug!(event = "Wait next block slot for validation", ?delay);
154 tokio::time::sleep(delay).await;
155 }
156 }
157
158 fn next_slot_in(tip_timestamp: u64) -> Option<Duration> {
160 let current_time_secs = get_current_timestamp();
161
162 let next_slot_timestamp = tip_timestamp + *MINIMUM_BLOCK_TIME;
163 if current_time_secs >= next_slot_timestamp {
164 None
165 } else {
166 Some(Duration::from_secs(next_slot_timestamp - current_time_secs))
168 }
169 }
170
171 pub fn name(&self) -> &'static str {
172 "proposal"
173 }
174}