scirs2_metrics/optimization/distributed_advanced/consensus/
mod.rs

1//! Consensus algorithms for distributed systems
2//!
3//! This module provides implementations of various consensus algorithms
4//! including Raft, PBFT, Proof of Stake, and simple majority consensus.
5
6pub mod coordinator;
7pub mod majority;
8pub mod pbft;
9pub mod proof_of_stake;
10pub mod raft;
11
12use crate::error::{MetricsError, Result};
13use scirs2_core::numeric::Float;
14use serde::{Deserialize, Serialize};
15use std::collections::{HashMap, HashSet};
16use std::time::{Duration, Instant, SystemTime};
17
18// Re-export main components
19pub use coordinator::*;
20pub use majority::*;
21pub use pbft::*;
22pub use proof_of_stake::*;
23pub use raft::*;
24
25/// Consensus algorithm configuration
26#[derive(Debug, Clone, Serialize, Deserialize)]
27pub struct ConsensusConfig {
28    /// Algorithm to use
29    pub algorithm: ConsensusAlgorithm,
30
31    /// Minimum number of nodes for quorum
32    pub quorum_size: usize,
33
34    /// Election timeout (milliseconds)
35    pub election_timeout_ms: u64,
36
37    /// Heartbeat interval (milliseconds)
38    pub heartbeat_interval_ms: u64,
39
40    /// Maximum entries per append
41    pub max_entries_per_append: usize,
42
43    /// Log compaction threshold
44    pub log_compaction_threshold: usize,
45
46    /// Snapshot creation interval
47    pub snapshot_interval: Duration,
48
49    /// Byzantine fault tolerance threshold
50    pub byzantine_threshold: usize,
51
52    /// Enable consensus optimization
53    pub enable_optimization: bool,
54
55    /// Consensus timeout (milliseconds)
56    pub consensus_timeout_ms: u64,
57}
58
59/// Consensus algorithms supported
60#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
61pub enum ConsensusAlgorithm {
62    /// Raft consensus algorithm
63    Raft,
64
65    /// Practical Byzantine Fault Tolerance
66    PBFT,
67
68    /// Proof of Stake consensus
69    ProofOfStake,
70
71    /// Simple majority consensus
72    SimpleMajority,
73
74    /// Custom consensus algorithm
75    Custom(String),
76}
77
78/// Consensus system state
79#[derive(Debug, Clone, Serialize, Deserialize)]
80pub struct ConsensusSystemState {
81    /// Current consensus leader
82    pub current_leader: Option<String>,
83
84    /// Active participants
85    pub active_participants: HashSet<String>,
86
87    /// Current term/epoch
88    pub current_term: u64,
89
90    /// Last consensus timestamp
91    pub last_consensus: SystemTime,
92
93    /// Consensus statistics
94    pub consensus_stats: ConsensusStats,
95
96    /// Algorithm-specific state
97    pub algorithm_state: AlgorithmSpecificState,
98}
99
100/// Algorithm-specific consensus state
101#[derive(Debug, Clone, Serialize, Deserialize)]
102pub enum AlgorithmSpecificState {
103    /// Raft-specific state
104    Raft(raft::RaftState),
105
106    /// PBFT-specific state
107    PBFT(pbft::PbftState),
108
109    /// Proof of Stake specific state
110    ProofOfStake(proof_of_stake::PoSState),
111
112    /// Simple majority state
113    SimpleMajority(majority::MajorityState),
114
115    /// Custom algorithm state
116    Custom(HashMap<String, String>),
117}
118
119/// Consensus statistics
120#[derive(Debug, Clone, Serialize, Deserialize, Default)]
121pub struct ConsensusStats {
122    /// Total consensus decisions made
123    pub total_decisions: u64,
124
125    /// Average consensus time (milliseconds)
126    pub avg_consensus_time_ms: f64,
127
128    /// Failed consensus attempts
129    pub failed_attempts: u64,
130
131    /// Leader changes
132    pub leader_changes: u64,
133
134    /// Byzantine failures detected
135    pub byzantine_failures: u64,
136
137    /// Network partitions handled
138    pub network_partitions: u64,
139
140    /// Current consensus health (0.0-1.0)
141    pub health_score: f64,
142}
143
144/// Consensus proposal for distributed decisions
145#[derive(Debug, Clone, Serialize, Deserialize)]
146pub struct ConsensusProposal<T> {
147    /// Proposal identifier
148    pub proposal_id: String,
149
150    /// Proposer node identifier
151    pub proposer: String,
152
153    /// Proposal timestamp
154    pub timestamp: SystemTime,
155
156    /// Proposal data
157    pub data: T,
158
159    /// Proposal priority
160    pub priority: u32,
161
162    /// Required quorum size
163    pub required_quorum: usize,
164
165    /// Proposal metadata
166    pub metadata: HashMap<String, String>,
167}
168
169/// Consensus decision result
170#[derive(Debug, Clone, Serialize, Deserialize)]
171pub struct ConsensusDecision<T> {
172    /// Decision identifier
173    pub decision_id: String,
174
175    /// Original proposal
176    pub proposal: ConsensusProposal<T>,
177
178    /// Decision outcome
179    pub outcome: DecisionOutcome,
180
181    /// Participating nodes
182    pub participants: HashSet<String>,
183
184    /// Decision timestamp
185    pub timestamp: SystemTime,
186
187    /// Consensus algorithm used
188    pub algorithm: ConsensusAlgorithm,
189
190    /// Decision metadata
191    pub metadata: HashMap<String, String>,
192}
193
194/// Consensus decision outcomes
195#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
196pub enum DecisionOutcome {
197    /// Consensus reached, proposal accepted
198    Accepted,
199
200    /// Consensus reached, proposal rejected
201    Rejected,
202
203    /// Consensus failed, timeout occurred
204    Timeout,
205
206    /// Consensus failed, insufficient participants
207    InsufficientQuorum,
208
209    /// Consensus failed, Byzantine fault detected
210    ByzantineFault,
211
212    /// Consensus failed, network partition
213    NetworkPartition,
214}
215
216/// Node vote in consensus
217#[derive(Debug, Clone, Serialize, Deserialize)]
218pub struct ConsensusVote {
219    /// Voting node identifier
220    pub voter: String,
221
222    /// Vote decision
223    pub vote: VoteDecision,
224
225    /// Vote timestamp
226    pub timestamp: SystemTime,
227
228    /// Vote signature/proof
229    pub signature: Option<String>,
230
231    /// Vote metadata
232    pub metadata: HashMap<String, String>,
233}
234
235/// Vote decisions
236#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
237pub enum VoteDecision {
238    /// Vote in favor
239    Accept,
240
241    /// Vote against
242    Reject,
243
244    /// Abstain from voting
245    Abstain,
246}
247
248/// Consensus manager trait for different algorithms
249#[allow(async_fn_in_trait)]
250pub trait ConsensusManager<T>: Send + Sync {
251    /// Initialize consensus manager
252    async fn initialize(&mut self) -> Result<()>;
253
254    /// Propose a value for consensus
255    async fn propose(&mut self, proposal: ConsensusProposal<T>) -> Result<String>;
256
257    /// Vote on a proposal
258    async fn vote(&mut self, proposal_id: &str, vote: ConsensusVote) -> Result<()>;
259
260    /// Get consensus decision if available
261    async fn get_decision(&self, proposal_id: &str) -> Result<Option<ConsensusDecision<T>>>;
262
263    /// Handle node failure
264    async fn handle_node_failure(&mut self, node_id: &str) -> Result<()>;
265
266    /// Handle node recovery
267    async fn handle_node_recovery(&mut self, node_id: &str) -> Result<()>;
268
269    /// Get current consensus state
270    async fn get_state(&self) -> Result<ConsensusSystemState>;
271
272    /// Get health score
273    async fn get_health_score(&self) -> Result<f64>;
274
275    /// Shutdown consensus manager
276    async fn shutdown(&mut self) -> Result<()>;
277}
278
279/// Consensus event for monitoring and debugging
280#[derive(Debug, Clone, Serialize, Deserialize)]
281pub struct ConsensusEvent {
282    /// Event identifier
283    pub event_id: String,
284
285    /// Event type
286    pub event_type: ConsensusEventType,
287
288    /// Event timestamp
289    pub timestamp: SystemTime,
290
291    /// Affected nodes
292    pub nodes: Vec<String>,
293
294    /// Event data
295    pub data: HashMap<String, String>,
296}
297
298/// Types of consensus events
299#[derive(Debug, Clone, Serialize, Deserialize)]
300pub enum ConsensusEventType {
301    /// New proposal submitted
302    ProposalSubmitted,
303
304    /// Vote cast by node
305    VoteCast,
306
307    /// Consensus decision reached
308    DecisionReached,
309
310    /// Leader election started
311    LeaderElection,
312
313    /// Leader change occurred
314    LeaderChange,
315
316    /// Node failure detected
317    NodeFailure,
318
319    /// Node recovery detected
320    NodeRecovery,
321
322    /// Byzantine behavior detected
323    ByzantineBehavior,
324
325    /// Network partition detected
326    NetworkPartition,
327
328    /// Network partition healed
329    PartitionHealed,
330}
331
332/// Consensus performance metrics
333#[derive(Debug, Clone, Serialize, Deserialize, Default)]
334pub struct ConsensusPerformanceMetrics {
335    /// Average proposal processing time
336    pub avg_proposal_time_ms: f64,
337
338    /// Average voting time
339    pub avg_voting_time_ms: f64,
340
341    /// Consensus success rate
342    pub success_rate: f64,
343
344    /// Network message overhead
345    pub message_overhead: f64,
346
347    /// Memory usage (bytes)
348    pub memory_usage_bytes: usize,
349
350    /// CPU utilization (0.0-1.0)
351    pub cpu_utilization: f64,
352}
353
354impl Default for ConsensusConfig {
355    fn default() -> Self {
356        Self {
357            algorithm: ConsensusAlgorithm::Raft,
358            quorum_size: 3,
359            election_timeout_ms: 5000,
360            heartbeat_interval_ms: 1000,
361            max_entries_per_append: 100,
362            log_compaction_threshold: 10000,
363            snapshot_interval: Duration::from_secs(300), // 5 minutes
364            byzantine_threshold: 1,
365            enable_optimization: true,
366            consensus_timeout_ms: 30000, // 30 seconds
367        }
368    }
369}
370
371impl<T> ConsensusProposal<T> {
372    /// Create new consensus proposal
373    pub fn new(proposal_id: String, proposer: String, data: T) -> Self {
374        Self {
375            proposal_id,
376            proposer,
377            timestamp: SystemTime::now(),
378            data,
379            priority: 0,
380            required_quorum: 3,
381            metadata: HashMap::new(),
382        }
383    }
384
385    /// Set proposal priority
386    pub fn with_priority(mut self, priority: u32) -> Self {
387        self.priority = priority;
388        self
389    }
390
391    /// Set required quorum size
392    pub fn with_quorum(mut self, quorum: usize) -> Self {
393        self.required_quorum = quorum;
394        self
395    }
396
397    /// Add metadata
398    pub fn with_metadata(mut self, key: String, value: String) -> Self {
399        self.metadata.insert(key, value);
400        self
401    }
402}
403
404impl ConsensusVote {
405    /// Create new consensus vote
406    pub fn new(voter: String, vote: VoteDecision) -> Self {
407        Self {
408            voter,
409            vote,
410            timestamp: SystemTime::now(),
411            signature: None,
412            metadata: HashMap::new(),
413        }
414    }
415
416    /// Add signature to vote
417    pub fn with_signature(mut self, signature: String) -> Self {
418        self.signature = Some(signature);
419        self
420    }
421
422    /// Add metadata to vote
423    pub fn with_metadata(mut self, key: String, value: String) -> Self {
424        self.metadata.insert(key, value);
425        self
426    }
427}
428
429#[cfg(test)]
430mod tests {
431    use super::*;
432
433    #[test]
434    fn test_consensus_config_default() {
435        let config = ConsensusConfig::default();
436        assert_eq!(config.algorithm, ConsensusAlgorithm::Raft);
437        assert_eq!(config.quorum_size, 3);
438        assert!(config.enable_optimization);
439    }
440
441    #[test]
442    fn test_consensus_proposal() {
443        let proposal = ConsensusProposal::new(
444            "test_proposal".to_string(),
445            "node1".to_string(),
446            "test_data".to_string(),
447        );
448
449        assert_eq!(proposal.proposal_id, "test_proposal");
450        assert_eq!(proposal.proposer, "node1");
451        assert_eq!(proposal.priority, 0);
452    }
453
454    #[test]
455    fn test_consensus_vote() {
456        let vote = ConsensusVote::new("node1".to_string(), VoteDecision::Accept);
457        assert_eq!(vote.voter, "node1");
458        assert_eq!(vote.vote, VoteDecision::Accept);
459        assert!(vote.signature.is_none());
460    }
461
462    #[test]
463    fn test_consensus_stats() {
464        let stats = ConsensusStats::default();
465        assert_eq!(stats.total_decisions, 0);
466        assert_eq!(stats.health_score, 0.0);
467    }
468}