Skip to main content

arc_malachitebft_core_consensus/
state.rs

1use std::time::{Duration, Instant};
2use tracing::info;
3
4use malachitebft_core_driver::Driver;
5use malachitebft_core_types::*;
6
7use crate::full_proposal::{FullProposal, FullProposalKeeper};
8use crate::input::Input;
9use crate::params::Params;
10use crate::prelude::*;
11use crate::types::ProposedValue;
12use crate::util::bounded_queue::BoundedQueue;
13
14/// The state maintained by consensus for processing a [`Input`].
15pub struct State<Ctx>
16where
17    Ctx: Context,
18{
19    /// The context for the consensus state machine
20    pub ctx: Ctx,
21
22    /// The consensus parameters
23    pub params: Params<Ctx>,
24
25    /// Driver for the per-round consensus state machine
26    pub driver: Driver<Ctx>,
27
28    /// A queue of inputs that were received before the driver started.
29    pub input_queue: BoundedQueue<Ctx::Height, Input<Ctx>>,
30
31    /// The proposals to decide on.
32    pub full_proposal_keeper: FullProposalKeeper<Ctx>,
33
34    /// Last prevote broadcasted by this node
35    pub last_signed_prevote: Option<SignedVote<Ctx>>,
36
37    /// Last precommit broadcasted by this node
38    pub last_signed_precommit: Option<SignedVote<Ctx>>,
39
40    /// Target time for the current height
41    pub target_time: Option<Duration>,
42
43    /// Start time of the current height
44    pub height_start_time: Option<Instant>,
45
46    /// Whether we are in the finalization period.
47    ///
48    /// The finalization period is entered in decide, cleared in finalize_height,
49    /// and only valid during the commit step.
50    ///
51    /// It allows collecting additional precommits for the decided value after
52    /// the decision is made in decide, which can be included in the commit certificate.
53    pub finalization_period: bool,
54}
55
56impl<Ctx> State<Ctx>
57where
58    Ctx: Context,
59{
60    pub fn new(
61        ctx: Ctx,
62        height: Ctx::Height,
63        validator_set: Ctx::ValidatorSet,
64        params: Params<Ctx>,
65        queue_capacity: usize,
66    ) -> Self {
67        let driver = Driver::new(
68            ctx.clone(),
69            height,
70            validator_set,
71            params.address.clone(),
72            params.threshold_params,
73        );
74
75        Self {
76            ctx,
77            driver,
78            params,
79            input_queue: BoundedQueue::new(queue_capacity),
80            full_proposal_keeper: Default::default(),
81            last_signed_prevote: None,
82            last_signed_precommit: None,
83            target_time: None,
84            height_start_time: None,
85            finalization_period: false,
86        }
87    }
88
89    pub fn height(&self) -> Ctx::Height {
90        self.driver.height()
91    }
92
93    pub fn round(&self) -> Round {
94        self.driver.round()
95    }
96
97    pub fn address(&self) -> &Ctx::Address {
98        self.driver.address()
99    }
100
101    pub fn validator_set(&self) -> &Ctx::ValidatorSet {
102        self.driver.validator_set()
103    }
104
105    pub fn get_proposer(&self, height: Ctx::Height, round: Round) -> &Ctx::Address {
106        self.ctx
107            .select_proposer(self.validator_set(), height, round)
108            .address()
109    }
110
111    pub fn set_last_vote(&mut self, vote: SignedVote<Ctx>) {
112        match vote.vote_type() {
113            VoteType::Prevote => self.last_signed_prevote = Some(vote),
114            VoteType::Precommit => self.last_signed_precommit = Some(vote),
115        }
116    }
117
118    pub fn restore_precommits(
119        &self,
120        height: Ctx::Height,
121        round: Round,
122        value: &Ctx::Value,
123    ) -> Vec<SignedVote<Ctx>> {
124        assert_eq!(height, self.driver.height());
125        self.driver.restore_precommits(round, &value.id())
126    }
127
128    /// Get the polka certificate at the current height for the specified round and value, if it exists
129    pub fn polka_certificate(
130        &self,
131        round: Round,
132        value_id: &ValueId<Ctx>,
133    ) -> Option<&PolkaCertificate<Ctx>> {
134        self.driver.polka_certificate(round, value_id)
135    }
136
137    pub fn full_proposal_at_round_and_value(
138        &self,
139        height: &Ctx::Height,
140        round: Round,
141        value: &Ctx::Value,
142    ) -> Option<&FullProposal<Ctx>> {
143        self.full_proposal_keeper
144            .full_proposal_at_round_and_value(height, round, &value.id())
145    }
146
147    pub fn full_proposal_at_round_and_proposer(
148        &self,
149        height: &Ctx::Height,
150        round: Round,
151        address: &Ctx::Address,
152    ) -> Option<&FullProposal<Ctx>> {
153        self.full_proposal_keeper
154            .full_proposal_at_round_and_proposer(height, round, address)
155    }
156
157    /// Get a proposed value by its ID at the specified height and round.
158    pub fn get_proposed_value_by_id(
159        &self,
160        height: Ctx::Height,
161        round: Round,
162        value_id: &ValueId<Ctx>,
163    ) -> Option<ProposedValue<Ctx>> {
164        let (value, validity) = self
165            .full_proposal_keeper
166            .get_value_by_id(&height, round, value_id)?;
167        Some(ProposedValue {
168            height,
169            round,
170            valid_round: Round::Nil,
171            proposer: self.get_proposer(height, round).clone(),
172            value: value.clone(),
173            validity,
174        })
175    }
176
177    pub fn proposals_for_value(
178        &self,
179        proposed_value: &ProposedValue<Ctx>,
180    ) -> Vec<SignedProposal<Ctx>> {
181        self.full_proposal_keeper
182            .proposals_for_value(proposed_value)
183    }
184
185    pub fn store_proposal(&mut self, new_proposal: SignedProposal<Ctx>) {
186        self.full_proposal_keeper.store_proposal(new_proposal)
187    }
188
189    /// Store the proposed value and return its validity,
190    /// which may be now be different from the one provided.
191    pub fn store_value(&mut self, new_value: &ProposedValue<Ctx>) -> Validity {
192        // Values for higher height should have been cached for future processing
193        assert_eq!(new_value.height, self.driver.height());
194
195        if self
196            .full_proposal_keeper
197            .get_value(&new_value.height, new_value.round, &new_value.value)
198            .is_none()
199            && new_value.validity.is_invalid()
200        {
201            warn!(
202                height = %new_value.height,
203                round = %new_value.round,
204                value.id = ?new_value.value.id(),
205                "Application sent an invalid proposed value"
206            );
207        }
208        // Store the value at both round and valid_round
209        self.full_proposal_keeper.store_value(new_value);
210
211        // Retrieve the validity after storing, as it may have changed (e.g., from Invalid to Valid)
212        let (_value, validity) = self
213            .full_proposal_keeper
214            .get_value(&new_value.height, new_value.round, &new_value.value)
215            .expect("We just stored the entry, so it should be there");
216
217        validity
218    }
219
220    pub fn reset_and_start_height(
221        &mut self,
222        height: Ctx::Height,
223        validator_set: Ctx::ValidatorSet,
224        target_time: Option<Duration>,
225    ) {
226        self.full_proposal_keeper.clear();
227        self.last_signed_prevote = None;
228        self.last_signed_precommit = None;
229        self.target_time = target_time;
230        self.height_start_time = Some(Instant::now());
231        self.finalization_period = false;
232
233        self.driver.move_to_height(height, validator_set);
234    }
235
236    /// Return the round and value id of the decided value.
237    pub fn decided_value(&self) -> Option<(Round, Ctx::Value)> {
238        self.driver.decided_value()
239    }
240
241    /// Queue an input for later processing, only keep inputs for the highest height seen so far.
242    pub fn buffer_input(&mut self, height: Ctx::Height, input: Input<Ctx>, _metrics: &Metrics) {
243        self.input_queue.push(height, input);
244
245        #[cfg(feature = "metrics")]
246        {
247            _metrics.queue_heights.set(self.input_queue.len() as i64);
248            _metrics.queue_size.set(self.input_queue.size() as i64);
249        }
250    }
251
252    /// Take all inputs that are pending for the specified height and remove from the input queue.
253    pub fn take_pending_inputs(&mut self, _metrics: &Metrics) -> Vec<Input<Ctx>>
254    where
255        Ctx: Context,
256    {
257        let inputs = self
258            .input_queue
259            .shift_and_take(&self.height())
260            .collect::<Vec<_>>();
261
262        #[cfg(feature = "metrics")]
263        {
264            _metrics.queue_heights.set(self.input_queue.len() as i64);
265            _metrics.queue_size.set(self.input_queue.size() as i64);
266        }
267
268        inputs
269    }
270
271    pub fn print_state(&self) {
272        if let Some(per_round) = self.driver.votes().per_round(self.driver.round()) {
273            info!(
274                "Number of validators having voted: {} / {}",
275                per_round.addresses_weights().get_inner().len(),
276                self.driver.validator_set().count()
277            );
278            info!(
279                "Total voting power of validators: {}",
280                self.driver.validator_set().total_voting_power()
281            );
282            info!(
283                "Voting power required: {}",
284                self.params
285                    .threshold_params
286                    .quorum
287                    .min_expected(self.driver.validator_set().total_voting_power())
288            );
289            info!(
290                "Total voting power of validators having voted: {}",
291                per_round.addresses_weights().sum()
292            );
293            info!(
294                "Total voting power of validators having prevoted nil: {}",
295                per_round
296                    .votes()
297                    .get_weight(VoteType::Prevote, &NilOrVal::Nil)
298            );
299            info!(
300                "Total voting power of validators having precommited nil: {}",
301                per_round
302                    .votes()
303                    .get_weight(VoteType::Precommit, &NilOrVal::Nil)
304            );
305            info!(
306                "Total weight of prevotes: {}",
307                per_round.votes().weight_sum(VoteType::Prevote)
308            );
309            info!(
310                "Total weight of precommits: {}",
311                per_round.votes().weight_sum(VoteType::Precommit)
312            );
313        }
314    }
315
316    /// Check if this node is an active validator.
317    ///
318    /// Returns true only if:
319    /// - Consensus is enabled in the configuration, AND
320    /// - This node is present in the current validator set
321    pub fn is_active_validator(&self) -> bool {
322        self.params.enabled
323            && self
324                .validator_set()
325                .get_by_address(self.address())
326                .is_some()
327    }
328
329    pub fn round_certificate(&self) -> Option<&EnterRoundCertificate<Ctx>> {
330        self.driver.round_certificate.as_ref()
331    }
332}