Skip to main content

aethex_consensus/
engine.rs

1use crate::{
2    quorum::{AddResult, QuorumCert, VoteAccumulator},
3    ValidatorSet, Vote, VoteType,
4};
5use axiom_core::{
6    config::ChainConfig,
7    types::{Block, Height, Round},
8};
9use axiom_crypto::Hash;
10use parking_lot::Mutex;
11use std::sync::Arc;
12use tracing::{debug, info, warn};
13
14/// Consensus phase within a single round.
15#[derive(Clone, Copy, Debug, PartialEq, Eq)]
16pub enum Phase {
17    Propose,
18    Prevote,
19    Precommit,
20    Commit,
21}
22
23/// Per-height round state.
24#[derive(Debug)]
25struct RoundState {
26    round: Round,
27    phase: Phase,
28    proposal: Option<Block>,
29    prevotes: VoteAccumulator,
30    precommits: VoteAccumulator,
31    locked_hash: Option<Hash>,
32    locked_round: Option<Round>,
33}
34
35impl RoundState {
36    fn new(round: Round) -> Self {
37        RoundState {
38            round,
39            phase: Phase::Propose,
40            proposal: None,
41            prevotes: VoteAccumulator::default(),
42            precommits: VoteAccumulator::default(),
43            locked_hash: None,
44            locked_round: None,
45        }
46    }
47}
48
49/// Consensus events that the Engine can emit.
50#[derive(Debug)]
51pub enum ConsensusEvent {
52    /// Engine has committed a block at a height.
53    Committed { height: Height, block: Block, qc: QuorumCert },
54    /// Engine needs the application to create a proposal.
55    NeedProposal { height: Height, round: Round },
56    /// Engine timed out and advanced to a new round.
57    RoundTimeout { height: Height, round: Round },
58    /// Equivocation detected.
59    Equivocation { validator: String, height: Height },
60}
61
62/// The core consensus state machine.
63///
64/// Thread-safe: all mutable state is behind a `Mutex`.
65pub struct Engine {
66    config: ChainConfig,
67    validator_set: ValidatorSet,
68    inner: Mutex<EngineInner>,
69}
70
71struct EngineInner {
72    height: Height,
73    round_state: RoundState,
74    committed: Vec<(Height, Hash)>,
75}
76
77impl Engine {
78    pub fn new(config: ChainConfig, validator_set: ValidatorSet, start_height: Height) -> Self {
79        let inner = EngineInner {
80            height: start_height,
81            round_state: RoundState::new(0),
82            committed: Vec::new(),
83        };
84        Engine { config, validator_set, inner: Mutex::new(inner) }
85    }
86
87    pub fn height(&self) -> Height {
88        self.inner.lock().height
89    }
90
91    pub fn round(&self) -> Round {
92        self.inner.lock().round_state.round
93    }
94
95    /// Set a received block proposal for the current round.
96    pub fn receive_proposal(&self, block: Block) -> Option<ConsensusEvent> {
97        let mut inner = self.inner.lock();
98        let height = inner.height;
99        let round = inner.round_state.round;
100        let phase = inner.round_state.phase;
101
102        if phase != Phase::Propose {
103            debug!("ignoring late proposal in phase {:?}", phase);
104            return None;
105        }
106        if block.height() != height {
107            warn!("proposal height mismatch: {} vs {}", block.height(), height);
108            return None;
109        }
110
111        info!(height, round, hash = %block.hash(), "received proposal");
112        inner.round_state.proposal = Some(block);
113        inner.round_state.phase = Phase::Prevote;
114        None
115    }
116
117    /// Process an incoming vote; returns an event if a threshold was crossed.
118    pub fn receive_vote(&self, vote: Vote) -> Option<ConsensusEvent> {
119        let mut inner = self.inner.lock();
120        let threshold = self.config.quorum_threshold(self.validator_set.total_power());
121        let height = inner.height;
122        let round = inner.round_state.round;
123
124        if vote.height != height || vote.round != round {
125            debug!("ignoring vote for h={} r={}", vote.height, vote.round);
126            return None;
127        }
128
129        let rs = &mut inner.round_state;
130
131        match vote.vote_type {
132            VoteType::Prevote => {
133                if rs.add_prevote(vote, &self.validator_set) == AddResult::Added {
134                    if let Some(hash) = rs.prevote_quorum_hash(&self.validator_set, threshold) {
135                        info!(height, round, %hash, "prevote quorum reached");
136                        rs.locked_hash = Some(hash);
137                        rs.locked_round = Some(round);
138                        rs.phase = Phase::Precommit;
139                    }
140                }
141            }
142            VoteType::Precommit => {
143                let block_hash = vote.block_hash;
144                if rs.add_precommit(vote, &self.validator_set) == AddResult::Added {
145                    if let Some(hash) = block_hash {
146                        if rs.precommits.has_quorum(Some(hash), &self.validator_set, threshold) {
147                            let qc = QuorumCert::try_build(
148                                &rs.precommits,
149                                height,
150                                round,
151                                hash,
152                                &self.validator_set,
153                                threshold,
154                            )?;
155                            let block = rs.proposal.clone()?;
156                            info!(height, %hash, "precommit quorum — committing block");
157                            inner.committed.push((height, hash));
158                            inner.height += 1;
159                            inner.round_state = RoundState::new(0);
160                            return Some(ConsensusEvent::Committed { height, block, qc });
161                        }
162                    }
163                }
164            }
165        }
166        None
167    }
168
169    /// Advance to the next round (called on timeout).
170    pub fn timeout(&self) -> ConsensusEvent {
171        let mut inner = self.inner.lock();
172        let height = inner.height;
173        let old_round = inner.round_state.round;
174        let new_round = old_round + 1;
175        inner.round_state = RoundState::new(new_round);
176        warn!(height, old_round, new_round, "consensus timeout — advancing round");
177        ConsensusEvent::RoundTimeout { height, round: old_round }
178    }
179
180    pub fn committed_blocks(&self) -> Vec<(Height, Hash)> {
181        self.inner.lock().committed.clone()
182    }
183}
184
185impl RoundState {
186    fn add_prevote(&mut self, vote: Vote, vset: &ValidatorSet) -> AddResult {
187        self.prevotes.add(vote, vset)
188    }
189
190    fn add_precommit(&mut self, vote: Vote, vset: &ValidatorSet) -> AddResult {
191        self.precommits.add(vote, vset)
192    }
193
194    fn prevote_quorum_hash(&self, vset: &ValidatorSet, threshold: u64) -> Option<Hash> {
195        for h in self.prevotes.candidate_hashes() {
196            if self.prevotes.has_quorum(Some(h), vset, threshold) {
197                return Some(h);
198            }
199        }
200        None
201    }
202}
203
204// Expose a shared Engine handle.
205pub type SharedEngine = Arc<Engine>;
206
207#[cfg(test)]
208mod tests {
209    use super::*;
210    use crate::validator_set::Validator;
211    use axiom_core::{
212        config::ChainConfig,
213        types::{Block, BlockHeader},
214    };
215    use axiom_crypto::{Hash, Keypair};
216
217    fn test_setup(n: usize) -> (SharedEngine, ValidatorSet, Vec<Keypair>) {
218        let kps: Vec<Keypair> = (0..n as u8).map(|i| Keypair::from_bytes(&[i + 1; 32])).collect();
219        let validators: Vec<Validator> = kps
220            .iter()
221            .map(|kp| Validator { id: kp.public_key(), voting_power: 100 })
222            .collect();
223        let vset = ValidatorSet::new(validators);
224        let engine = Arc::new(Engine::new(ChainConfig::default(), vset.clone(), 1));
225        (engine, vset, kps)
226    }
227
228    fn make_block(height: u64, proposer: &Keypair) -> Block {
229        let header = BlockHeader {
230            chain_id: "axiom-devnet-1".into(),
231            height,
232            epoch: 0,
233            round: 0,
234            timestamp: 0,
235            parent_hash: Hash::ZERO,
236            tx_root: Hash::ZERO,
237            state_root: Hash::ZERO,
238            quorum_cert: Hash::ZERO,
239            proposer: proposer.public_key(),
240        };
241        let sig = proposer.sign(header.hash().as_bytes());
242        Block { header, transactions: vec![], proposer_sig: sig }
243    }
244
245    #[test]
246    fn full_consensus_round() {
247        let (engine, _vset, kps) = test_setup(4);
248        let block = make_block(1, &kps[0]);
249        let block_hash = block.hash();
250
251        engine.receive_proposal(block.clone());
252
253        // 3 prevotes (quorum)
254        for kp in kps.iter().take(3) {
255            let v = Vote::sign(VoteType::Prevote, 1, 0, Some(block_hash), kp);
256            engine.receive_vote(v);
257        }
258
259        // 3 precommits
260        let mut result = None;
261        for kp in kps.iter().take(3) {
262            let v = Vote::sign(VoteType::Precommit, 1, 0, Some(block_hash), kp);
263            result = engine.receive_vote(v);
264        }
265
266        assert!(matches!(result, Some(ConsensusEvent::Committed { height: 1, .. })));
267        assert_eq!(engine.height(), 2);
268    }
269}