agentic_payments/consensus/
bft.rs

1//! Byzantine Fault Tolerant Consensus Implementation
2//!
3//! Implements Practical Byzantine Fault Tolerance (PBFT) algorithm with:
4//! - Three-phase commit protocol (pre-prepare, prepare, commit)
5//! - View change mechanism for leader failures
6//! - Weighted voting with reputation
7//! - Byzantine fault detection and recovery
8
9use super::{
10    quorum::{Quorum, QuorumConfig},
11    reputation::{ReputationConfig, ReputationSystem},
12    voting::{VoteCollector, VotingConfig},
13    Authority, AuthorityId, BftConsensusResult, Consensus, ConsensusPhase, RoundId, Vote,
14    VoteValue,
15};
16use crate::error::{Error, Result};
17use serde::{Deserialize, Serialize};
18use std::collections::{HashMap, HashSet};
19use std::time::{Duration, Instant};
20
21/// BFT consensus configuration
22#[derive(Debug, Clone, Serialize, Deserialize)]
23pub struct BftConfig {
24    pub quorum_config: QuorumConfig,
25    pub voting_config: VotingConfig,
26    pub reputation_config: ReputationConfig,
27    /// Maximum rounds before view change
28    pub max_rounds_per_view: u64,
29    /// Timeout for each phase
30    pub phase_timeout: Duration,
31    /// Enable reputation-based weights
32    pub use_reputation_weights: bool,
33}
34
35impl Default for BftConfig {
36    fn default() -> Self {
37        BftConfig {
38            quorum_config: QuorumConfig::byzantine(1),
39            voting_config: VotingConfig::default(),
40            reputation_config: ReputationConfig::default(),
41            max_rounds_per_view: 100,
42            phase_timeout: Duration::from_secs(30),
43            use_reputation_weights: true,
44        }
45    }
46}
47
48/// PBFT consensus state for a round
49struct RoundState {
50    round_id: RoundId,
51    view: u64,
52    phase: ConsensusPhase,
53    proposed_value: Option<VoteValue>,
54    pre_prepare_received: bool,
55    prepare_votes: VoteCollector,
56    commit_votes: VoteCollector,
57    phase_start: Instant,
58}
59
60impl RoundState {
61    fn new(round_id: RoundId, view: u64, voting_config: VotingConfig) -> Self {
62        RoundState {
63            round_id,
64            view,
65            phase: ConsensusPhase::Idle,
66            proposed_value: None,
67            pre_prepare_received: false,
68            prepare_votes: VoteCollector::new(round_id, voting_config.clone()),
69            commit_votes: VoteCollector::new(round_id, voting_config),
70            phase_start: Instant::now(),
71        }
72    }
73}
74
75/// Byzantine Fault Tolerant consensus
76pub struct BftConsensus {
77    config: BftConfig,
78    current_round: RoundId,
79    current_view: u64,
80    quorum: Quorum,
81    reputation: ReputationSystem,
82    round_state: Option<RoundState>,
83    consensus_result: Option<BftConsensusResult>,
84    view_change_votes: HashMap<u64, HashSet<AuthorityId>>,
85    primary_authority: AuthorityId,
86}
87
88impl BftConsensus {
89    pub fn new(config: BftConfig, authorities: Vec<Authority>) -> Result<Self> {
90        let primary_authority = authorities
91            .first()
92            .ok_or_else(|| Error::InvalidState {
93                message: "No authorities provided".to_string(),
94            })?
95            .id
96            .clone();
97
98        let quorum = Quorum::new(config.quorum_config.clone(), authorities)?;
99        let reputation = ReputationSystem::new(config.reputation_config.clone());
100
101        Ok(BftConsensus {
102            config,
103            current_round: RoundId(0),
104            current_view: 0,
105            quorum,
106            reputation,
107            round_state: None,
108            consensus_result: None,
109            view_change_votes: HashMap::new(),
110            primary_authority,
111        })
112    }
113
114    /// Get current primary authority for the view
115    fn get_primary(&self) -> &AuthorityId {
116        let authorities = self.quorum.authorities();
117        let index = (self.current_view as usize) % authorities.len();
118        &authorities[index].id
119    }
120
121    /// Check if authority is current primary
122    fn is_primary(&self, authority: &AuthorityId) -> bool {
123        self.get_primary() == authority
124    }
125
126    /// Get effective vote weight with reputation
127    fn get_vote_weight(&self, authority: &AuthorityId) -> Result<u64> {
128        let base_weight = self.quorum.get_weight(authority)?;
129
130        if !self.config.use_reputation_weights {
131            return Ok(base_weight);
132        }
133
134        let reputation = self
135            .reputation
136            .calculate_weighted_reputation(authority)
137            .unwrap_or(1.0);
138
139        Ok((base_weight as f64 * reputation) as u64)
140    }
141
142    /// Handle pre-prepare message from primary
143    pub fn handle_pre_prepare(&mut self, value: VoteValue) -> Result<()> {
144        let state = self.round_state.as_mut().ok_or_else(|| {
145            Error::InvalidState {
146                message: "No active round".to_string(),
147            }
148        })?;
149
150        if state.phase != ConsensusPhase::Idle {
151            return Err(Error::InvalidState {
152                message: format!("Wrong phase: {:?}", state.phase),
153            });
154        }
155
156        state.proposed_value = Some(value);
157        state.pre_prepare_received = true;
158        state.phase = ConsensusPhase::PrePrepare;
159        state.phase_start = Instant::now();
160
161        Ok(())
162    }
163
164    /// Handle prepare vote
165    pub fn handle_prepare(&mut self, vote: Vote) -> Result<()> {
166        let state = self.round_state.as_mut().ok_or_else(|| {
167            Error::InvalidState {
168                message: "No active round".to_string(),
169            }
170        })?;
171
172        if state.phase != ConsensusPhase::PrePrepare && state.phase != ConsensusPhase::Prepare {
173            return Err(Error::InvalidState {
174                message: format!("Wrong phase for prepare: {:?}", state.phase),
175            });
176        }
177
178        state.prepare_votes.add_vote(vote)?;
179        state.phase = ConsensusPhase::Prepare;
180
181        // Check if we have prepare quorum
182        let total_weight = state.prepare_votes.get_total_weight();
183        if self.quorum.has_quorum(total_weight) {
184            state.phase = ConsensusPhase::Commit;
185            state.phase_start = Instant::now();
186        }
187
188        Ok(())
189    }
190
191    /// Handle commit vote
192    pub fn handle_commit(&mut self, vote: Vote) -> Result<()> {
193        let state = self.round_state.as_mut().ok_or_else(|| {
194            Error::InvalidState {
195                message: "No active round".to_string(),
196            }
197        })?;
198
199        if state.phase != ConsensusPhase::Commit {
200            return Err(Error::InvalidState {
201                message: format!("Wrong phase for commit: {:?}", state.phase),
202            });
203        }
204
205        state.commit_votes.add_vote(vote)?;
206
207        // Check if we have commit quorum
208        let total_weight = state.commit_votes.get_total_weight();
209        if self.quorum.has_quorum(total_weight) {
210            self.finalize_consensus()?;
211        }
212
213        Ok(())
214    }
215
216    /// Finalize consensus
217    fn finalize_consensus(&mut self) -> Result<()> {
218        let state = self.round_state.as_ref().ok_or_else(|| {
219            Error::InvalidState {
220                message: "No active round".to_string(),
221            }
222        })?;
223
224        let leading = state
225            .commit_votes
226            .get_leading_value()
227            .ok_or_else(|| Error::InvalidState {
228                message: "No leading value in commit phase".to_string(),
229            })?;
230
231        let result = BftConsensusResult {
232            round_id: state.round_id,
233            value: leading.value.clone(),
234            total_weight: leading.total_weight,
235            participating_authorities: leading.authorities.clone(),
236            phase: ConsensusPhase::Decided,
237        };
238
239        // Update reputation for participants
240        for authority in &result.participating_authorities {
241            let _ = self.reputation.record_correct_vote(authority);
242        }
243
244        self.consensus_result = Some(result);
245        if let Some(state) = self.round_state.as_mut() {
246            state.phase = ConsensusPhase::Decided;
247        }
248
249        Ok(())
250    }
251
252    /// Handle view change
253    pub fn handle_view_change(&mut self, new_view: u64, authority: AuthorityId) -> Result<()> {
254        self.view_change_votes
255            .entry(new_view)
256            .or_insert_with(HashSet::new)
257            .insert(authority);
258
259        let vote_count = self.view_change_votes.get(&new_view).unwrap().len();
260        let required = self.quorum.authority_count() * 2 / 3;
261
262        if vote_count >= required {
263            self.execute_view_change(new_view)?;
264        }
265
266        Ok(())
267    }
268
269    /// Execute view change
270    fn execute_view_change(&mut self, new_view: u64) -> Result<()> {
271        self.current_view = new_view;
272        self.view_change_votes.clear();
273
274        // Reset round state
275        if let Some(state) = self.round_state.as_mut() {
276            state.phase = ConsensusPhase::ViewChange;
277        }
278
279        Ok(())
280    }
281
282    /// Detect Byzantine behavior
283    pub fn detect_byzantine_faults(&mut self) -> Vec<AuthorityId> {
284        let mut byzantine = Vec::new();
285
286        if let Some(state) = &self.round_state {
287            byzantine.extend(state.prepare_votes.detect_byzantine_authorities());
288            byzantine.extend(state.commit_votes.detect_byzantine_authorities());
289        }
290
291        // Mark as Byzantine and update reputation
292        for auth in &byzantine {
293            let _ = self.quorum.mark_byzantine(auth);
294            let _ = self.reputation.record_byzantine_fault(auth);
295        }
296
297        byzantine
298    }
299}
300
301impl Consensus for BftConsensus {
302    fn submit_vote(&mut self, vote: Vote) -> Result<()> {
303        // Get weighted vote
304        let weight = self.get_vote_weight(&vote.authority)?;
305        let mut weighted_vote = vote;
306        weighted_vote.weight = weight;
307
308        let state = self.round_state.as_ref().ok_or_else(|| {
309            Error::InvalidState {
310                message: "No active round".to_string(),
311            }
312        })?;
313
314        match state.phase {
315            ConsensusPhase::PrePrepare | ConsensusPhase::Prepare => {
316                self.handle_prepare(weighted_vote)
317            }
318            ConsensusPhase::Commit => self.handle_commit(weighted_vote),
319            _ => Err(Error::InvalidState {
320                message: format!("Cannot submit vote in phase: {:?}", state.phase),
321            }),
322        }
323    }
324
325    fn has_consensus(&self) -> bool {
326        self.consensus_result.is_some()
327    }
328
329    fn get_result(&self) -> Option<BftConsensusResult> {
330        self.consensus_result.clone()
331    }
332
333    fn get_phase(&self) -> ConsensusPhase {
334        self.round_state
335            .as_ref()
336            .map(|s| s.phase)
337            .unwrap_or(ConsensusPhase::Idle)
338    }
339
340    fn start_round(&mut self, round_id: RoundId, value: VoteValue) -> Result<()> {
341        if self.has_consensus() {
342            return Err(Error::AlreadyReached);
343        }
344
345        self.current_round = round_id;
346        self.round_state = Some(RoundState::new(
347            round_id,
348            self.current_view,
349            self.config.voting_config.clone(),
350        ));
351
352        // If we're primary, send pre-prepare
353        if self.is_primary(&self.primary_authority) {
354            self.handle_pre_prepare(value)?;
355        }
356
357        Ok(())
358    }
359
360    fn current_round(&self) -> RoundId {
361        self.current_round
362    }
363
364    fn authorities(&self) -> Vec<Authority> {
365        self.quorum.authorities().into_iter().cloned().collect()
366    }
367
368    fn handle_timeout(&mut self) -> Result<()> {
369        let state = self.round_state.as_ref().ok_or_else(|| {
370            Error::InvalidState {
371                message: "No active round".to_string(),
372            }
373        })?;
374
375        if state.phase_start.elapsed() > self.config.phase_timeout {
376            // Trigger view change
377            let new_view = self.current_view + 1;
378            self.handle_view_change(new_view, self.primary_authority.clone())?;
379
380            return Err(Error::ViewChangeRequired("Primary authority has failed".to_string()));
381        }
382
383        Ok(())
384    }
385}
386
387#[cfg(test)]
388mod tests {
389    use super::*;
390
391    fn create_test_authorities(count: usize) -> Vec<Authority> {
392        (0..count)
393            .map(|i| Authority::new(AuthorityId::from(format!("auth-{}", i)), 100))
394            .collect()
395    }
396
397    fn create_vote(round: u64, authority: &str, value: &str) -> Vote {
398        Vote::new(
399            RoundId(round),
400            AuthorityId::from(authority),
401            VoteValue::from_string(value),
402            100,
403        )
404        .with_signature(vec![1, 2, 3])
405    }
406
407    #[test]
408    fn test_bft_creation() {
409        let authorities = create_test_authorities(4);
410        let config = BftConfig::default();
411        let bft = BftConsensus::new(config, authorities);
412
413        assert!(bft.is_ok());
414    }
415
416    #[test]
417    fn test_start_round() {
418        let authorities = create_test_authorities(4);
419        let config = BftConfig::default();
420        let mut bft = BftConsensus::new(config, authorities).unwrap();
421
422        let result = bft.start_round(RoundId(1), VoteValue::from_string("value-a"));
423        assert!(result.is_ok());
424        assert_eq!(bft.current_round(), RoundId(1));
425    }
426
427    #[test]
428    fn test_prepare_phase() {
429        let authorities = create_test_authorities(4);
430        let config = BftConfig::default();
431        let mut bft = BftConsensus::new(config, authorities).unwrap();
432
433        // Initialize round (doesn't call pre_prepare unless node is primary)
434        bft.start_round(RoundId(1), VoteValue::from_string("value-a")).unwrap();
435
436        // Manually call handle_pre_prepare (simulating receiving from primary)
437        bft.handle_pre_prepare(VoteValue::from_string("value-a")).unwrap();
438        assert_eq!(bft.get_phase(), ConsensusPhase::PrePrepare);
439
440        let vote = create_vote(1, "auth-0", "value-a");
441        assert!(bft.handle_prepare(vote).is_ok());
442        assert_eq!(bft.get_phase(), ConsensusPhase::Prepare);
443    }
444
445    #[test]
446    fn test_full_consensus() {
447        let authorities = create_test_authorities(4);
448        let config = BftConfig::default();
449        let mut bft = BftConsensus::new(config, authorities).unwrap();
450
451        // Start round
452        bft.start_round(RoundId(1), VoteValue::from_string("value-a")).unwrap();
453        bft.handle_pre_prepare(VoteValue::from_string("value-a")).unwrap();
454
455        // Prepare phase - need 3 votes for quorum (2/3 of 4)
456        for i in 0..3 {
457            let vote = create_vote(1, &format!("auth-{}", i), "value-a");
458            bft.handle_prepare(vote).unwrap();
459        }
460
461        assert_eq!(bft.get_phase(), ConsensusPhase::Commit);
462
463        // Commit phase
464        for i in 0..3 {
465            let vote = create_vote(1, &format!("auth-{}", i), "value-a");
466            bft.handle_commit(vote).unwrap();
467        }
468
469        assert!(bft.has_consensus());
470        assert_eq!(bft.get_phase(), ConsensusPhase::Decided);
471
472        let result = bft.get_result().unwrap();
473        assert_eq!(result.value, VoteValue::from_string("value-a"));
474    }
475
476    #[test]
477    fn test_insufficient_authorities() {
478        let authorities = create_test_authorities(2);
479        let config = BftConfig::default(); // Requires 4 for f=1
480        let result = BftConsensus::new(config, authorities);
481
482        assert!(result.is_err());
483    }
484
485    #[test]
486    fn test_view_change() {
487        let authorities = create_test_authorities(4);
488        let config = BftConfig::default();
489        let mut bft = BftConsensus::new(config, authorities).unwrap();
490
491        let initial_view = bft.current_view;
492
493        // Need 3 votes for view change (2/3 of 4)
494        for i in 0..3 {
495            bft.handle_view_change(1, AuthorityId::from(format!("auth-{}", i)))
496                .unwrap();
497        }
498
499        assert_eq!(bft.current_view, initial_view + 1);
500    }
501
502    #[test]
503    fn test_primary_rotation() {
504        let authorities = create_test_authorities(4);
505        let config = BftConfig::default();
506        let mut bft = BftConsensus::new(config, authorities).unwrap();
507
508        // Get initial primary (order depends on HashMap iteration)
509        let primary_view_0 = bft.get_primary().clone();
510
511        // Verify primary is one of the authorities
512        let all_authorities = bft.authorities();
513        assert!(all_authorities.iter().any(|a| a.id == primary_view_0));
514
515        // Trigger view change and verify primary changes
516        bft.current_view = 1;
517        let primary_view_1 = bft.get_primary().clone();
518
519        // In most cases primary should rotate (unless we only have 1 authority)
520        assert!(all_authorities.len() > 1 || primary_view_0 == primary_view_1);
521    }
522}