Skip to main content

vrf_pbft/
engine.rs

1use crate::error::{Error, Result};
2use crate::node::Node;
3use crate::pbft::{Message, Round};
4use crate::types::{Block, Role, Stage};
5
6pub struct Config {
7    fault_tolerance: u64,
8    // Per-role expected committee sizes for VRF sortition
9    expected_proposers: u64,
10    expected_validators: u64,
11    expected_packers: u64,
12}
13
14impl Config {
15    /// Create a config with the given fault tolerance `f`.
16    /// The system tolerates up to `f` byzantine nodes.
17    /// Requires at least `3f + 1` total nodes.
18    pub fn new(fault_tolerance: u64) -> Self {
19        Self {
20            fault_tolerance,
21            expected_proposers: 1,
22            expected_validators: 3 * fault_tolerance + 1,
23            expected_packers: fault_tolerance + 1,
24        }
25    }
26
27    pub fn with_expected_committee(mut self, n: u64) -> Self {
28        self.expected_validators = n;
29        self
30    }
31
32    /// Minimum votes needed for quorum: 2f + 1
33    pub fn threshold(&self) -> u64 {
34        2 * self.fault_tolerance + 1
35    }
36
37    /// Minimum nodes needed: 3f + 1
38    pub fn min_nodes(&self) -> u64 {
39        3 * self.fault_tolerance + 1
40    }
41}
42
43pub struct ConsensusEngine {
44    nodes: Vec<Node>,
45    config: Config,
46    round: u64,
47    committed: Vec<Block>,
48    messages: Vec<Message>,
49}
50
51impl ConsensusEngine {
52    pub fn new(config: Config) -> Self {
53        Self {
54            nodes: Vec::new(),
55            config,
56            round: 0,
57            committed: Vec::new(),
58            messages: Vec::new(),
59        }
60    }
61
62    pub fn add_node(&mut self, node: Node) {
63        self.nodes.push(node);
64    }
65
66    pub fn committed_blocks(&self) -> &[Block] {
67        &self.committed
68    }
69
70    pub fn round(&self) -> u64 {
71        self.round
72    }
73
74    pub fn nodes(&self) -> &[Node] {
75        &self.nodes
76    }
77
78    pub fn messages(&self) -> &[Message] {
79        &self.messages
80    }
81
82    fn total_weight(&self) -> u64 {
83        self.nodes.iter().map(|n| n.weight()).sum()
84    }
85
86    /// Run a single consensus round through all PBFT phases.
87    pub fn run_round(&mut self) -> Result<Block> {
88        let min = self.config.min_nodes() as usize;
89        if self.nodes.len() < min {
90            return Err(Error::NotEnoughNodes {
91                needed: min,
92                have: self.nodes.len(),
93            });
94        }
95
96        self.round += 1;
97        self.messages.clear();
98        let total_weight = self.total_weight();
99        let role_expected = [
100            (Role::Proposer, self.config.expected_proposers),
101            (Role::Validator, self.config.expected_validators),
102            (Role::Packer, self.config.expected_packers),
103        ];
104
105        // Phase 0: VRF role assignment
106        for node in &mut self.nodes {
107            node.assign_role(self.round, total_weight, &role_expected)?;
108        }
109
110        // Find proposer (first one wins if multiple)
111        let proposer_idx = self
112            .nodes
113            .iter()
114            .position(|n| n.role() == Role::Proposer)
115            .ok_or(Error::NoProposer(self.round))?;
116
117        // Collect validator indices
118        let validator_indices: Vec<usize> = self
119            .nodes
120            .iter()
121            .enumerate()
122            .filter(|(_, n)| n.role() == Role::Validator)
123            .map(|(i, _)| i)
124            .collect();
125
126        // Adaptive threshold: 2/3+1 of actual committee, capped at 2f+1
127        let effective_threshold = if validator_indices.is_empty() {
128            self.config.threshold()
129        } else {
130            let adaptive = (validator_indices.len() as u64 * 2 / 3) + 1;
131            adaptive.min(self.config.threshold())
132        };
133
134        // Protocol-level round tracks votes and stage transitions
135        let mut protocol = Round::new(self.round, Role::Proposer, effective_threshold);
136
137        // Phase 1: PrePrepare -- proposer creates block
138        let block = self.nodes[proposer_idx].propose_block(self.round)?;
139        let block_hash = block.hash();
140        protocol.set_block(block.clone());
141        self.messages.push(Message::pre_prepare(
142            self.nodes[proposer_idx].id(),
143            self.round,
144            block.clone(),
145        ));
146
147        protocol.advance(Stage::Prepare)?;
148
149        // Phase 2: Prepare -- validators vote on the block
150        for &idx in &validator_indices {
151            let approve = self.nodes[idx].validate_block(&block);
152            if approve {
153                protocol.add_prepare_vote(self.nodes[idx].id());
154            }
155            self.messages.push(Message::prepare(
156                self.nodes[idx].id(),
157                self.round,
158                block_hash,
159                approve,
160            ));
161        }
162
163        if !protocol.has_prepare_quorum() {
164            return Err(Error::InsufficientVotes {
165                needed: effective_threshold,
166                got: protocol.prepare_count(),
167            });
168        }
169
170        protocol.advance(Stage::Commit)?;
171
172        // Phase 3: Commit -- validators confirm
173        for &idx in &validator_indices {
174            let approve = self.nodes[idx].validate_block(&block);
175            if approve {
176                protocol.add_commit_vote(self.nodes[idx].id());
177            }
178            self.messages.push(Message::commit(
179                self.nodes[idx].id(),
180                self.round,
181                block_hash,
182                approve,
183            ));
184        }
185
186        if !protocol.has_commit_quorum() {
187            return Err(Error::InsufficientVotes {
188                needed: effective_threshold,
189                got: protocol.commit_count(),
190            });
191        }
192
193        protocol.advance(Stage::Reply)?;
194
195        // Phase 4: Reply -- proposer broadcasts final block, all nodes commit
196        self.messages.push(Message::reply(
197            self.nodes[proposer_idx].id(),
198            self.round,
199            block.clone(),
200        ));
201
202        for node in &mut self.nodes {
203            node.commit_block(block.clone());
204        }
205
206        self.committed.push(block.clone());
207        Ok(block)
208    }
209
210    /// Run multiple rounds. Skips rounds where VRF doesn't elect a proposer.
211    pub fn run(&mut self, rounds: u64) -> Result<Vec<Block>> {
212        let mut blocks = Vec::new();
213        for _ in 0..rounds {
214            match self.run_round() {
215                Ok(block) => blocks.push(block),
216                Err(Error::NoProposer(_)) => continue,
217                Err(Error::InsufficientVotes { .. }) => continue,
218                Err(e) => return Err(e),
219            }
220        }
221        Ok(blocks)
222    }
223}