dusk_consensus/proposal/
step.rs

1// This Source Code Form is subject to the terms of the Mozilla Public
2// License, v. 2.0. If a copy of the MPL was not distributed with this
3// file, You can obtain one at http://mozilla.org/MPL/2.0/.
4//
5// Copyright (c) DUSK NETWORK. All rights reserved.
6
7use std::cmp;
8use std::sync::Arc;
9use std::time::Duration;
10
11use node_data::get_current_timestamp;
12use node_data::ledger::IterationsInfo;
13use node_data::message::Message;
14use tokio::sync::Mutex;
15use tracing::{debug, error, warn};
16
17use crate::commons::Database;
18use crate::config;
19use crate::config::MINIMUM_BLOCK_TIME;
20use crate::errors::{OperationError, StateTransitionError};
21use crate::execution_ctx::ExecutionCtx;
22use crate::msg_handler::{MsgHandler, StepOutcome};
23use crate::operations::Operations;
24use crate::proposal::block_generator::Generator;
25use crate::proposal::handler;
26
27pub struct ProposalStep<T, D: Database>
28where
29    T: Operations,
30{
31    handler: Arc<Mutex<handler::ProposalHandler<D>>>,
32    bg: Generator<T>,
33}
34
35impl<T: Operations + 'static, D: Database> ProposalStep<T, D> {
36    pub fn new(
37        executor: Arc<T>,
38        _db: Arc<Mutex<D>>,
39        handler: Arc<Mutex<handler::ProposalHandler<D>>>,
40    ) -> Self {
41        Self {
42            handler,
43            bg: Generator::new(executor),
44        }
45    }
46
47    pub async fn reinitialize(
48        &mut self,
49        _msg: Message,
50        round: u64,
51        iteration: u8,
52    ) {
53        debug!(event = "init", name = self.name(), round, iter = iteration,)
54    }
55
56    pub async fn run(&mut self, mut ctx: ExecutionCtx<'_, T, D>) -> Message {
57        let committee = ctx
58            .get_current_committee()
59            .expect("committee to be created before run");
60
61        let tip_timestamp = ctx.round_update.timestamp();
62
63        if ctx.am_member(committee) {
64            let iteration =
65                cmp::min(config::RELAX_ITERATION_THRESHOLD, ctx.iteration);
66
67            // Fetch failed attestations from att_registry
68            let failed_attestations =
69                ctx.att_registry.lock().await.get_failed_atts(iteration);
70
71            match self
72                .bg
73                .generate_candidate_message(
74                    &ctx.round_update,
75                    ctx.iteration,
76                    IterationsInfo::new(failed_attestations),
77                )
78                .await
79            {
80                Ok(msg) => {
81                    debug!(
82                        event = "send message",
83                        src = "proposal",
84                        msg_topic = ?msg.topic(),
85                        info = ?msg.header,
86                        ray_id = msg.ray_id()
87                    );
88                    ctx.outbound.try_send(msg.clone());
89
90                    // register new candidate in local state
91                    match self
92                        .handler
93                        .lock()
94                        .await
95                        .collect(
96                            msg,
97                            &ctx.round_update,
98                            committee,
99                            None,
100                            &ctx.iter_ctx.committees,
101                        )
102                        .await
103                    {
104                        Ok(StepOutcome::Ready(msg)) => {
105                            Self::wait_until_next_slot(tip_timestamp).await;
106                            return msg;
107                        }
108                        Err(err) => {
109                            error!(event = "Failed to store candidate", ?err)
110                        }
111                        _ => {}
112                    };
113                }
114
115                Err(err) => match err {
116                    OperationError::FailedTransitionCreation(
117                        StateTransitionError::TipChanged,
118                    ) => {
119                        warn!(
120                          event = "Stopped block generation",
121                          round = ctx.round_update.round,
122                          iteration = ctx.iteration,
123                          %err,
124                        );
125                    }
126                    _ => {
127                        error!(
128                          event = "Failed to generate candidate block",
129                          round = ctx.round_update.round,
130                          iteration = ctx.iteration,
131                          %err,
132                        )
133                    }
134                },
135            }
136        }
137
138        let additional_timeout = Self::next_slot_in(tip_timestamp);
139        let msg = match ctx.handle_future_msgs(self.handler.clone()).await {
140            StepOutcome::Ready(m) => m,
141            StepOutcome::Pending => {
142                ctx.event_loop(self.handler.clone(), additional_timeout)
143                    .await
144            }
145        };
146        Self::wait_until_next_slot(tip_timestamp).await;
147        msg
148    }
149
150    /// Waits until the next slot is reached
151    async fn wait_until_next_slot(tip_timestamp: u64) {
152        if let Some(delay) = Self::next_slot_in(tip_timestamp) {
153            debug!(event = "Wait next block slot for validation", ?delay);
154            tokio::time::sleep(delay).await;
155        }
156    }
157
158    /// Calculate the duration needed to the next slot
159    fn next_slot_in(tip_timestamp: u64) -> Option<Duration> {
160        let current_time_secs = get_current_timestamp();
161
162        let next_slot_timestamp = tip_timestamp + *MINIMUM_BLOCK_TIME;
163        if current_time_secs >= next_slot_timestamp {
164            None
165        } else {
166            // block_timestamp - localtime
167            Some(Duration::from_secs(next_slot_timestamp - current_time_secs))
168        }
169    }
170
171    pub fn name(&self) -> &'static str {
172        "proposal"
173    }
174}