agentic_payments/consensus/
bft.rs1use super::{
10 quorum::{Quorum, QuorumConfig},
11 reputation::{ReputationConfig, ReputationSystem},
12 voting::{VoteCollector, VotingConfig},
13 Authority, AuthorityId, BftConsensusResult, Consensus, ConsensusPhase, RoundId, Vote,
14 VoteValue,
15};
16use crate::error::{Error, Result};
17use serde::{Deserialize, Serialize};
18use std::collections::{HashMap, HashSet};
19use std::time::{Duration, Instant};
20
21#[derive(Debug, Clone, Serialize, Deserialize)]
23pub struct BftConfig {
24 pub quorum_config: QuorumConfig,
25 pub voting_config: VotingConfig,
26 pub reputation_config: ReputationConfig,
27 pub max_rounds_per_view: u64,
29 pub phase_timeout: Duration,
31 pub use_reputation_weights: bool,
33}
34
35impl Default for BftConfig {
36 fn default() -> Self {
37 BftConfig {
38 quorum_config: QuorumConfig::byzantine(1),
39 voting_config: VotingConfig::default(),
40 reputation_config: ReputationConfig::default(),
41 max_rounds_per_view: 100,
42 phase_timeout: Duration::from_secs(30),
43 use_reputation_weights: true,
44 }
45 }
46}
47
48struct RoundState {
50 round_id: RoundId,
51 view: u64,
52 phase: ConsensusPhase,
53 proposed_value: Option<VoteValue>,
54 pre_prepare_received: bool,
55 prepare_votes: VoteCollector,
56 commit_votes: VoteCollector,
57 phase_start: Instant,
58}
59
60impl RoundState {
61 fn new(round_id: RoundId, view: u64, voting_config: VotingConfig) -> Self {
62 RoundState {
63 round_id,
64 view,
65 phase: ConsensusPhase::Idle,
66 proposed_value: None,
67 pre_prepare_received: false,
68 prepare_votes: VoteCollector::new(round_id, voting_config.clone()),
69 commit_votes: VoteCollector::new(round_id, voting_config),
70 phase_start: Instant::now(),
71 }
72 }
73}
74
75pub struct BftConsensus {
77 config: BftConfig,
78 current_round: RoundId,
79 current_view: u64,
80 quorum: Quorum,
81 reputation: ReputationSystem,
82 round_state: Option<RoundState>,
83 consensus_result: Option<BftConsensusResult>,
84 view_change_votes: HashMap<u64, HashSet<AuthorityId>>,
85 primary_authority: AuthorityId,
86}
87
88impl BftConsensus {
89 pub fn new(config: BftConfig, authorities: Vec<Authority>) -> Result<Self> {
90 let primary_authority = authorities
91 .first()
92 .ok_or_else(|| Error::InvalidState {
93 message: "No authorities provided".to_string(),
94 })?
95 .id
96 .clone();
97
98 let quorum = Quorum::new(config.quorum_config.clone(), authorities)?;
99 let reputation = ReputationSystem::new(config.reputation_config.clone());
100
101 Ok(BftConsensus {
102 config,
103 current_round: RoundId(0),
104 current_view: 0,
105 quorum,
106 reputation,
107 round_state: None,
108 consensus_result: None,
109 view_change_votes: HashMap::new(),
110 primary_authority,
111 })
112 }
113
114 fn get_primary(&self) -> &AuthorityId {
116 let authorities = self.quorum.authorities();
117 let index = (self.current_view as usize) % authorities.len();
118 &authorities[index].id
119 }
120
121 fn is_primary(&self, authority: &AuthorityId) -> bool {
123 self.get_primary() == authority
124 }
125
126 fn get_vote_weight(&self, authority: &AuthorityId) -> Result<u64> {
128 let base_weight = self.quorum.get_weight(authority)?;
129
130 if !self.config.use_reputation_weights {
131 return Ok(base_weight);
132 }
133
134 let reputation = self
135 .reputation
136 .calculate_weighted_reputation(authority)
137 .unwrap_or(1.0);
138
139 Ok((base_weight as f64 * reputation) as u64)
140 }
141
142 pub fn handle_pre_prepare(&mut self, value: VoteValue) -> Result<()> {
144 let state = self.round_state.as_mut().ok_or_else(|| {
145 Error::InvalidState {
146 message: "No active round".to_string(),
147 }
148 })?;
149
150 if state.phase != ConsensusPhase::Idle {
151 return Err(Error::InvalidState {
152 message: format!("Wrong phase: {:?}", state.phase),
153 });
154 }
155
156 state.proposed_value = Some(value);
157 state.pre_prepare_received = true;
158 state.phase = ConsensusPhase::PrePrepare;
159 state.phase_start = Instant::now();
160
161 Ok(())
162 }
163
164 pub fn handle_prepare(&mut self, vote: Vote) -> Result<()> {
166 let state = self.round_state.as_mut().ok_or_else(|| {
167 Error::InvalidState {
168 message: "No active round".to_string(),
169 }
170 })?;
171
172 if state.phase != ConsensusPhase::PrePrepare && state.phase != ConsensusPhase::Prepare {
173 return Err(Error::InvalidState {
174 message: format!("Wrong phase for prepare: {:?}", state.phase),
175 });
176 }
177
178 state.prepare_votes.add_vote(vote)?;
179 state.phase = ConsensusPhase::Prepare;
180
181 let total_weight = state.prepare_votes.get_total_weight();
183 if self.quorum.has_quorum(total_weight) {
184 state.phase = ConsensusPhase::Commit;
185 state.phase_start = Instant::now();
186 }
187
188 Ok(())
189 }
190
191 pub fn handle_commit(&mut self, vote: Vote) -> Result<()> {
193 let state = self.round_state.as_mut().ok_or_else(|| {
194 Error::InvalidState {
195 message: "No active round".to_string(),
196 }
197 })?;
198
199 if state.phase != ConsensusPhase::Commit {
200 return Err(Error::InvalidState {
201 message: format!("Wrong phase for commit: {:?}", state.phase),
202 });
203 }
204
205 state.commit_votes.add_vote(vote)?;
206
207 let total_weight = state.commit_votes.get_total_weight();
209 if self.quorum.has_quorum(total_weight) {
210 self.finalize_consensus()?;
211 }
212
213 Ok(())
214 }
215
216 fn finalize_consensus(&mut self) -> Result<()> {
218 let state = self.round_state.as_ref().ok_or_else(|| {
219 Error::InvalidState {
220 message: "No active round".to_string(),
221 }
222 })?;
223
224 let leading = state
225 .commit_votes
226 .get_leading_value()
227 .ok_or_else(|| Error::InvalidState {
228 message: "No leading value in commit phase".to_string(),
229 })?;
230
231 let result = BftConsensusResult {
232 round_id: state.round_id,
233 value: leading.value.clone(),
234 total_weight: leading.total_weight,
235 participating_authorities: leading.authorities.clone(),
236 phase: ConsensusPhase::Decided,
237 };
238
239 for authority in &result.participating_authorities {
241 let _ = self.reputation.record_correct_vote(authority);
242 }
243
244 self.consensus_result = Some(result);
245 if let Some(state) = self.round_state.as_mut() {
246 state.phase = ConsensusPhase::Decided;
247 }
248
249 Ok(())
250 }
251
252 pub fn handle_view_change(&mut self, new_view: u64, authority: AuthorityId) -> Result<()> {
254 self.view_change_votes
255 .entry(new_view)
256 .or_insert_with(HashSet::new)
257 .insert(authority);
258
259 let vote_count = self.view_change_votes.get(&new_view).unwrap().len();
260 let required = self.quorum.authority_count() * 2 / 3;
261
262 if vote_count >= required {
263 self.execute_view_change(new_view)?;
264 }
265
266 Ok(())
267 }
268
269 fn execute_view_change(&mut self, new_view: u64) -> Result<()> {
271 self.current_view = new_view;
272 self.view_change_votes.clear();
273
274 if let Some(state) = self.round_state.as_mut() {
276 state.phase = ConsensusPhase::ViewChange;
277 }
278
279 Ok(())
280 }
281
282 pub fn detect_byzantine_faults(&mut self) -> Vec<AuthorityId> {
284 let mut byzantine = Vec::new();
285
286 if let Some(state) = &self.round_state {
287 byzantine.extend(state.prepare_votes.detect_byzantine_authorities());
288 byzantine.extend(state.commit_votes.detect_byzantine_authorities());
289 }
290
291 for auth in &byzantine {
293 let _ = self.quorum.mark_byzantine(auth);
294 let _ = self.reputation.record_byzantine_fault(auth);
295 }
296
297 byzantine
298 }
299}
300
301impl Consensus for BftConsensus {
302 fn submit_vote(&mut self, vote: Vote) -> Result<()> {
303 let weight = self.get_vote_weight(&vote.authority)?;
305 let mut weighted_vote = vote;
306 weighted_vote.weight = weight;
307
308 let state = self.round_state.as_ref().ok_or_else(|| {
309 Error::InvalidState {
310 message: "No active round".to_string(),
311 }
312 })?;
313
314 match state.phase {
315 ConsensusPhase::PrePrepare | ConsensusPhase::Prepare => {
316 self.handle_prepare(weighted_vote)
317 }
318 ConsensusPhase::Commit => self.handle_commit(weighted_vote),
319 _ => Err(Error::InvalidState {
320 message: format!("Cannot submit vote in phase: {:?}", state.phase),
321 }),
322 }
323 }
324
325 fn has_consensus(&self) -> bool {
326 self.consensus_result.is_some()
327 }
328
329 fn get_result(&self) -> Option<BftConsensusResult> {
330 self.consensus_result.clone()
331 }
332
333 fn get_phase(&self) -> ConsensusPhase {
334 self.round_state
335 .as_ref()
336 .map(|s| s.phase)
337 .unwrap_or(ConsensusPhase::Idle)
338 }
339
340 fn start_round(&mut self, round_id: RoundId, value: VoteValue) -> Result<()> {
341 if self.has_consensus() {
342 return Err(Error::AlreadyReached);
343 }
344
345 self.current_round = round_id;
346 self.round_state = Some(RoundState::new(
347 round_id,
348 self.current_view,
349 self.config.voting_config.clone(),
350 ));
351
352 if self.is_primary(&self.primary_authority) {
354 self.handle_pre_prepare(value)?;
355 }
356
357 Ok(())
358 }
359
360 fn current_round(&self) -> RoundId {
361 self.current_round
362 }
363
364 fn authorities(&self) -> Vec<Authority> {
365 self.quorum.authorities().into_iter().cloned().collect()
366 }
367
368 fn handle_timeout(&mut self) -> Result<()> {
369 let state = self.round_state.as_ref().ok_or_else(|| {
370 Error::InvalidState {
371 message: "No active round".to_string(),
372 }
373 })?;
374
375 if state.phase_start.elapsed() > self.config.phase_timeout {
376 let new_view = self.current_view + 1;
378 self.handle_view_change(new_view, self.primary_authority.clone())?;
379
380 return Err(Error::ViewChangeRequired("Primary authority has failed".to_string()));
381 }
382
383 Ok(())
384 }
385}
386
387#[cfg(test)]
388mod tests {
389 use super::*;
390
391 fn create_test_authorities(count: usize) -> Vec<Authority> {
392 (0..count)
393 .map(|i| Authority::new(AuthorityId::from(format!("auth-{}", i)), 100))
394 .collect()
395 }
396
397 fn create_vote(round: u64, authority: &str, value: &str) -> Vote {
398 Vote::new(
399 RoundId(round),
400 AuthorityId::from(authority),
401 VoteValue::from_string(value),
402 100,
403 )
404 .with_signature(vec![1, 2, 3])
405 }
406
407 #[test]
408 fn test_bft_creation() {
409 let authorities = create_test_authorities(4);
410 let config = BftConfig::default();
411 let bft = BftConsensus::new(config, authorities);
412
413 assert!(bft.is_ok());
414 }
415
416 #[test]
417 fn test_start_round() {
418 let authorities = create_test_authorities(4);
419 let config = BftConfig::default();
420 let mut bft = BftConsensus::new(config, authorities).unwrap();
421
422 let result = bft.start_round(RoundId(1), VoteValue::from_string("value-a"));
423 assert!(result.is_ok());
424 assert_eq!(bft.current_round(), RoundId(1));
425 }
426
427 #[test]
428 fn test_prepare_phase() {
429 let authorities = create_test_authorities(4);
430 let config = BftConfig::default();
431 let mut bft = BftConsensus::new(config, authorities).unwrap();
432
433 bft.start_round(RoundId(1), VoteValue::from_string("value-a")).unwrap();
435
436 bft.handle_pre_prepare(VoteValue::from_string("value-a")).unwrap();
438 assert_eq!(bft.get_phase(), ConsensusPhase::PrePrepare);
439
440 let vote = create_vote(1, "auth-0", "value-a");
441 assert!(bft.handle_prepare(vote).is_ok());
442 assert_eq!(bft.get_phase(), ConsensusPhase::Prepare);
443 }
444
445 #[test]
446 fn test_full_consensus() {
447 let authorities = create_test_authorities(4);
448 let config = BftConfig::default();
449 let mut bft = BftConsensus::new(config, authorities).unwrap();
450
451 bft.start_round(RoundId(1), VoteValue::from_string("value-a")).unwrap();
453 bft.handle_pre_prepare(VoteValue::from_string("value-a")).unwrap();
454
455 for i in 0..3 {
457 let vote = create_vote(1, &format!("auth-{}", i), "value-a");
458 bft.handle_prepare(vote).unwrap();
459 }
460
461 assert_eq!(bft.get_phase(), ConsensusPhase::Commit);
462
463 for i in 0..3 {
465 let vote = create_vote(1, &format!("auth-{}", i), "value-a");
466 bft.handle_commit(vote).unwrap();
467 }
468
469 assert!(bft.has_consensus());
470 assert_eq!(bft.get_phase(), ConsensusPhase::Decided);
471
472 let result = bft.get_result().unwrap();
473 assert_eq!(result.value, VoteValue::from_string("value-a"));
474 }
475
476 #[test]
477 fn test_insufficient_authorities() {
478 let authorities = create_test_authorities(2);
479 let config = BftConfig::default(); let result = BftConsensus::new(config, authorities);
481
482 assert!(result.is_err());
483 }
484
485 #[test]
486 fn test_view_change() {
487 let authorities = create_test_authorities(4);
488 let config = BftConfig::default();
489 let mut bft = BftConsensus::new(config, authorities).unwrap();
490
491 let initial_view = bft.current_view;
492
493 for i in 0..3 {
495 bft.handle_view_change(1, AuthorityId::from(format!("auth-{}", i)))
496 .unwrap();
497 }
498
499 assert_eq!(bft.current_view, initial_view + 1);
500 }
501
502 #[test]
503 fn test_primary_rotation() {
504 let authorities = create_test_authorities(4);
505 let config = BftConfig::default();
506 let mut bft = BftConsensus::new(config, authorities).unwrap();
507
508 let primary_view_0 = bft.get_primary().clone();
510
511 let all_authorities = bft.authorities();
513 assert!(all_authorities.iter().any(|a| a.id == primary_view_0));
514
515 bft.current_view = 1;
517 let primary_view_1 = bft.get_primary().clone();
518
519 assert!(all_authorities.len() > 1 || primary_view_0 == primary_view_1);
521 }
522}