1use crate::error::{Error, Result};
2use crate::node::Node;
3use crate::pbft::{Message, Round};
4use crate::types::{Block, Role, Stage};
5
6pub struct Config {
7 fault_tolerance: u64,
8 expected_proposers: u64,
10 expected_validators: u64,
11 expected_packers: u64,
12}
13
14impl Config {
15 pub fn new(fault_tolerance: u64) -> Self {
19 Self {
20 fault_tolerance,
21 expected_proposers: 1,
22 expected_validators: 3 * fault_tolerance + 1,
23 expected_packers: fault_tolerance + 1,
24 }
25 }
26
27 pub fn with_expected_committee(mut self, n: u64) -> Self {
28 self.expected_validators = n;
29 self
30 }
31
32 pub fn threshold(&self) -> u64 {
34 2 * self.fault_tolerance + 1
35 }
36
37 pub fn min_nodes(&self) -> u64 {
39 3 * self.fault_tolerance + 1
40 }
41}
42
43pub struct ConsensusEngine {
44 nodes: Vec<Node>,
45 config: Config,
46 round: u64,
47 committed: Vec<Block>,
48 messages: Vec<Message>,
49}
50
51impl ConsensusEngine {
52 pub fn new(config: Config) -> Self {
53 Self {
54 nodes: Vec::new(),
55 config,
56 round: 0,
57 committed: Vec::new(),
58 messages: Vec::new(),
59 }
60 }
61
62 pub fn add_node(&mut self, node: Node) {
63 self.nodes.push(node);
64 }
65
66 pub fn committed_blocks(&self) -> &[Block] {
67 &self.committed
68 }
69
70 pub fn round(&self) -> u64 {
71 self.round
72 }
73
74 pub fn nodes(&self) -> &[Node] {
75 &self.nodes
76 }
77
78 pub fn messages(&self) -> &[Message] {
79 &self.messages
80 }
81
82 fn total_weight(&self) -> u64 {
83 self.nodes.iter().map(|n| n.weight()).sum()
84 }
85
86 pub fn run_round(&mut self) -> Result<Block> {
88 let min = self.config.min_nodes() as usize;
89 if self.nodes.len() < min {
90 return Err(Error::NotEnoughNodes {
91 needed: min,
92 have: self.nodes.len(),
93 });
94 }
95
96 self.round += 1;
97 self.messages.clear();
98 let total_weight = self.total_weight();
99 let role_expected = [
100 (Role::Proposer, self.config.expected_proposers),
101 (Role::Validator, self.config.expected_validators),
102 (Role::Packer, self.config.expected_packers),
103 ];
104
105 for node in &mut self.nodes {
107 node.assign_role(self.round, total_weight, &role_expected)?;
108 }
109
110 let proposer_idx = self
112 .nodes
113 .iter()
114 .position(|n| n.role() == Role::Proposer)
115 .ok_or(Error::NoProposer(self.round))?;
116
117 let validator_indices: Vec<usize> = self
119 .nodes
120 .iter()
121 .enumerate()
122 .filter(|(_, n)| n.role() == Role::Validator)
123 .map(|(i, _)| i)
124 .collect();
125
126 let effective_threshold = if validator_indices.is_empty() {
128 self.config.threshold()
129 } else {
130 let adaptive = (validator_indices.len() as u64 * 2 / 3) + 1;
131 adaptive.min(self.config.threshold())
132 };
133
134 let mut protocol = Round::new(self.round, Role::Proposer, effective_threshold);
136
137 let block = self.nodes[proposer_idx].propose_block(self.round)?;
139 let block_hash = block.hash();
140 protocol.set_block(block.clone());
141 self.messages.push(Message::pre_prepare(
142 self.nodes[proposer_idx].id(),
143 self.round,
144 block.clone(),
145 ));
146
147 protocol.advance(Stage::Prepare)?;
148
149 for &idx in &validator_indices {
151 let approve = self.nodes[idx].validate_block(&block);
152 if approve {
153 protocol.add_prepare_vote(self.nodes[idx].id());
154 }
155 self.messages.push(Message::prepare(
156 self.nodes[idx].id(),
157 self.round,
158 block_hash,
159 approve,
160 ));
161 }
162
163 if !protocol.has_prepare_quorum() {
164 return Err(Error::InsufficientVotes {
165 needed: effective_threshold,
166 got: protocol.prepare_count(),
167 });
168 }
169
170 protocol.advance(Stage::Commit)?;
171
172 for &idx in &validator_indices {
174 let approve = self.nodes[idx].validate_block(&block);
175 if approve {
176 protocol.add_commit_vote(self.nodes[idx].id());
177 }
178 self.messages.push(Message::commit(
179 self.nodes[idx].id(),
180 self.round,
181 block_hash,
182 approve,
183 ));
184 }
185
186 if !protocol.has_commit_quorum() {
187 return Err(Error::InsufficientVotes {
188 needed: effective_threshold,
189 got: protocol.commit_count(),
190 });
191 }
192
193 protocol.advance(Stage::Reply)?;
194
195 self.messages.push(Message::reply(
197 self.nodes[proposer_idx].id(),
198 self.round,
199 block.clone(),
200 ));
201
202 for node in &mut self.nodes {
203 node.commit_block(block.clone());
204 }
205
206 self.committed.push(block.clone());
207 Ok(block)
208 }
209
210 pub fn run(&mut self, rounds: u64) -> Result<Vec<Block>> {
212 let mut blocks = Vec::new();
213 for _ in 0..rounds {
214 match self.run_round() {
215 Ok(block) => blocks.push(block),
216 Err(Error::NoProposer(_)) => continue,
217 Err(Error::InsufficientVotes { .. }) => continue,
218 Err(e) => return Err(e),
219 }
220 }
221 Ok(blocks)
222 }
223}