scirs2_metrics/optimization/distributed/consensus/
mod.rs

1//! Consensus algorithms for distributed coordination
2//!
3//! This module provides implementations of various consensus algorithms:
4//! - Raft consensus algorithm
5//! - Practical Byzantine Fault Tolerance (PBFT)
6//! - Proof of Stake consensus
7//! - Simple majority voting
8
9use crate::error::{MetricsError, Result};
10use scirs2_core::random::Rng;
11use serde::{Deserialize, Serialize};
12use std::collections::{HashMap, VecDeque};
13use std::net::SocketAddr;
14use std::sync::{Arc, Mutex};
15use std::time::{Duration, Instant, SystemTime};
16
17pub use super::config::{ConsensusAlgorithm, ConsensusConfig};
18
19/// Trait for consensus algorithm implementations
20pub trait ConsensusManager: Send + Sync {
21    /// Start the consensus algorithm
22    fn start(&mut self) -> Result<()>;
23    /// Submit a proposal for consensus
24    fn propose(&mut self, data: Vec<u8>) -> Result<String>;
25    /// Get the current consensus state
26    fn get_state(&self) -> ConsensusState;
27}
28
29/// Raft consensus algorithm implementation
30#[derive(Debug)]
31pub struct RaftConsensus {
32    /// Node ID
33    node_id: String,
34    /// Current term
35    current_term: u64,
36    /// Voted for in current term
37    voted_for: Option<String>,
38    /// Log entries
39    log: Vec<LogEntry>,
40    /// Current state
41    state: NodeState,
42    /// Known peers
43    peers: HashMap<String, PeerState>,
44    /// Configuration
45    config: ConsensusConfig,
46    /// Last heartbeat time
47    last_heartbeat: Instant,
48    /// Election timeout
49    election_timeout: Duration,
50    /// Next index for each peer
51    next_index: HashMap<String, usize>,
52    /// Match index for each peer
53    match_index: HashMap<String, usize>,
54}
55
56impl RaftConsensus {
57    /// Create a new Raft consensus instance
58    pub fn new(node_id: String, peers: Vec<String>, config: ConsensusConfig) -> Self {
59        let mut peer_states = HashMap::new();
60        for peer in peers {
61            peer_states.insert(
62                peer.clone(),
63                PeerState {
64                    id: peer,
65                    last_seen: Instant::now(),
66                    is_healthy: true,
67                    address: None,
68                },
69            );
70        }
71
72        Self {
73            node_id,
74            current_term: 0,
75            voted_for: None,
76            log: vec![],
77            state: NodeState::Follower,
78            peers: peer_states,
79            config,
80            last_heartbeat: Instant::now(),
81            election_timeout: Duration::from_millis(5000),
82            next_index: HashMap::new(),
83            match_index: HashMap::new(),
84        }
85    }
86
87    /// Start an election
88    pub fn start_election(&mut self) -> Result<()> {
89        self.current_term += 1;
90        self.state = NodeState::Candidate;
91        self.voted_for = Some(self.node_id.clone());
92        self.last_heartbeat = Instant::now();
93
94        // Reset election timeout with randomization
95        let base_timeout = self.config.election_timeout_ms;
96        let jitter = scirs2_core::random::rng().random_range(0..base_timeout / 2);
97        self.election_timeout = Duration::from_millis(base_timeout + jitter);
98
99        // TODO: Send vote requests to all peers
100        // This would be implemented with actual network communication
101
102        Ok(())
103    }
104
105    /// Append entries to log
106    pub fn append_entries(&mut self, entries: Vec<LogEntry>) -> Result<bool> {
107        // Simplified append entries implementation
108        for entry in entries {
109            self.log.push(entry);
110        }
111        Ok(true)
112    }
113
114    /// Handle vote request
115    pub fn handle_vote_request(&mut self, term: u64, candidate_id: String) -> Result<bool> {
116        if term > self.current_term {
117            self.current_term = term;
118            self.voted_for = None;
119            self.state = NodeState::Follower;
120        }
121
122        let can_vote = self.voted_for.is_none() || self.voted_for.as_ref() == Some(&candidate_id);
123
124        if term == self.current_term && can_vote {
125            self.voted_for = Some(candidate_id);
126            Ok(true)
127        } else {
128            Ok(false)
129        }
130    }
131
132    /// Become leader
133    pub fn become_leader(&mut self) -> Result<()> {
134        self.state = NodeState::Leader;
135
136        // Initialize next_index and match_index for all peers
137        let log_length = self.log.len();
138        for peer_id in self.peers.keys() {
139            self.next_index.insert(peer_id.clone(), log_length);
140            self.match_index.insert(peer_id.clone(), 0);
141        }
142
143        Ok(())
144    }
145
146    /// Send heartbeat to peers
147    pub fn send_heartbeat(&mut self) -> Result<()> {
148        if self.state != NodeState::Leader {
149            return Ok(());
150        }
151
152        self.last_heartbeat = Instant::now();
153
154        // TODO: Send heartbeat messages to all peers
155        // This would be implemented with actual network communication
156
157        Ok(())
158    }
159
160    /// Check if election timeout has occurred
161    pub fn is_election_timeout(&self) -> bool {
162        self.last_heartbeat.elapsed() > self.election_timeout
163    }
164
165    /// Get current log length
166    pub fn log_length(&self) -> usize {
167        self.log.len()
168    }
169
170    /// Get current term
171    pub fn current_term(&self) -> u64 {
172        self.current_term
173    }
174
175    /// Get current state
176    pub fn current_state(&self) -> &NodeState {
177        &self.state
178    }
179}
180
181impl ConsensusManager for RaftConsensus {
182    fn start(&mut self) -> Result<()> {
183        // Initialize Raft consensus
184        self.last_heartbeat = Instant::now();
185        Ok(())
186    }
187
188    fn propose(&mut self, data: Vec<u8>) -> Result<String> {
189        if self.state != NodeState::Leader {
190            return Err(MetricsError::ConsensusError(
191                "Only leader can propose entries".to_string(),
192            ));
193        }
194
195        let entry = LogEntry {
196            term: self.current_term,
197            index: self.log.len() as u64,
198            command: Command::UserData(data),
199            timestamp: SystemTime::now(),
200        };
201
202        let entry_id = format!("entry_{}_{}", self.current_term, entry.index);
203        self.log.push(entry);
204
205        // TODO: Replicate to followers
206
207        Ok(entry_id)
208    }
209
210    fn get_state(&self) -> ConsensusState {
211        ConsensusState {
212            term: self.current_term,
213            leader: if self.state == NodeState::Leader {
214                Some(self.node_id.clone())
215            } else {
216                None
217            },
218            node_state: self.state.clone(),
219            log_length: self.log.len(),
220            committed_index: 0, // Simplified
221        }
222    }
223}
224
225/// Current consensus state
226#[derive(Debug, Clone, Serialize, Deserialize)]
227pub struct ConsensusState {
228    /// Current term
229    pub term: u64,
230    /// Current leader (if known)
231    pub leader: Option<String>,
232    /// Node state
233    pub node_state: NodeState,
234    /// Log length
235    pub log_length: usize,
236    /// Last committed index
237    pub committed_index: usize,
238}
239
240/// Node states in Raft
241#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
242pub enum NodeState {
243    /// Follower state
244    Follower,
245    /// Candidate state (during election)
246    Candidate,
247    /// Leader state
248    Leader,
249}
250
251/// Log entry in Raft
252#[derive(Debug, Clone, Serialize, Deserialize)]
253pub struct LogEntry {
254    /// Term when entry was created
255    pub term: u64,
256    /// Index in the log
257    pub index: u64,
258    /// Command to apply
259    pub command: Command,
260    /// Timestamp when entry was created
261    pub timestamp: SystemTime,
262}
263
264/// Commands that can be stored in the log
265#[derive(Debug, Clone, Serialize, Deserialize)]
266pub enum Command {
267    /// No-op command (used for heartbeats)
268    NoOp,
269    /// User data
270    UserData(Vec<u8>),
271    /// Configuration change
272    ConfigChange {
273        /// Type of change
274        change_type: ConfigChangeType,
275        /// Node ID
276        node_id: String,
277        /// Node address
278        address: Option<SocketAddr>,
279    },
280    /// Snapshot command
281    Snapshot {
282        /// Last included index
283        last_included_index: u64,
284        /// Last included term
285        last_included_term: u64,
286        /// Snapshot data
287        data: Vec<u8>,
288    },
289}
290
291/// Configuration change types
292#[derive(Debug, Clone, Serialize, Deserialize)]
293pub enum ConfigChangeType {
294    /// Add a new node
295    AddNode,
296    /// Remove an existing node
297    RemoveNode,
298    /// Update node address
299    UpdateNode,
300}
301
302/// Peer state information
303#[derive(Debug, Clone)]
304pub struct PeerState {
305    /// Peer ID
306    pub id: String,
307    /// Last time we heard from this peer
308    pub last_seen: Instant,
309    /// Whether the peer is considered healthy
310    pub is_healthy: bool,
311    /// Peer network address
312    pub address: Option<SocketAddr>,
313}
314
315/// PBFT consensus implementation (simplified)
316#[derive(Debug)]
317pub struct PbftConsensus {
318    /// Node ID
319    node_id: String,
320    /// Current view
321    current_view: u64,
322    /// Current sequence number
323    sequence_number: u64,
324    /// Known peers
325    peers: HashMap<String, PeerState>,
326    /// Configuration
327    config: ConsensusConfig,
328    /// Message log for three-phase protocol
329    message_log: Vec<PbftMessage>,
330}
331
332impl PbftConsensus {
333    /// Create a new PBFT consensus instance
334    pub fn new(node_id: String, peers: Vec<String>, config: ConsensusConfig) -> Self {
335        let mut peer_states = HashMap::new();
336        for peer in peers {
337            peer_states.insert(
338                peer.clone(),
339                PeerState {
340                    id: peer,
341                    last_seen: Instant::now(),
342                    is_healthy: true,
343                    address: None,
344                },
345            );
346        }
347
348        Self {
349            node_id,
350            current_view: 0,
351            sequence_number: 0,
352            peers: peer_states,
353            config,
354            message_log: vec![],
355        }
356    }
357
358    /// Check if we have enough replicas for consensus
359    pub fn has_quorum(&self) -> bool {
360        let total_nodes = self.peers.len() + 1; // +1 for self
361        let healthy_nodes = self.peers.values().filter(|p| p.is_healthy).count() + 1;
362
363        // PBFT requires 3f + 1 nodes to tolerate f Byzantine failures
364        // For simplicity, we use 2f + 1 for non-Byzantine consensus
365        healthy_nodes > (total_nodes * 2 / 3)
366    }
367
368    /// Start PBFT three-phase protocol
369    pub fn start_consensus(&mut self, request: Vec<u8>) -> Result<String> {
370        if !self.has_quorum() {
371            return Err(MetricsError::ConsensusError(
372                "Insufficient nodes for consensus".to_string(),
373            ));
374        }
375
376        self.sequence_number += 1;
377        let message = PbftMessage {
378            message_type: PbftMessageType::PrePrepare,
379            view: self.current_view,
380            sequence: self.sequence_number,
381            digest: self.compute_digest(&request),
382            node_id: self.node_id.clone(),
383            data: request,
384            timestamp: SystemTime::now(),
385        };
386
387        self.message_log.push(message.clone());
388
389        // TODO: Send pre-prepare to all replicas
390
391        Ok(format!(
392            "pbft_{}_{}",
393            self.current_view, self.sequence_number
394        ))
395    }
396
397    fn compute_digest(&self, data: &[u8]) -> String {
398        // Simplified hash computation
399        use std::collections::hash_map::DefaultHasher;
400        use std::hash::{Hash, Hasher};
401
402        let mut hasher = DefaultHasher::new();
403        data.hash(&mut hasher);
404        format!("{:x}", hasher.finish())
405    }
406}
407
408impl ConsensusManager for PbftConsensus {
409    fn start(&mut self) -> Result<()> {
410        // Initialize PBFT
411        self.current_view = 0;
412        self.sequence_number = 0;
413        Ok(())
414    }
415
416    fn propose(&mut self, data: Vec<u8>) -> Result<String> {
417        self.start_consensus(data)
418    }
419
420    fn get_state(&self) -> ConsensusState {
421        ConsensusState {
422            term: self.current_view,
423            leader: Some(format!(
424                "primary_{}",
425                self.current_view % (self.peers.len() + 1) as u64
426            )),
427            node_state: NodeState::Follower, // Simplified
428            log_length: self.message_log.len(),
429            committed_index: 0, // Simplified
430        }
431    }
432}
433
434/// PBFT message types
435#[derive(Debug, Clone, Serialize, Deserialize)]
436pub enum PbftMessageType {
437    /// Pre-prepare phase
438    PrePrepare,
439    /// Prepare phase
440    Prepare,
441    /// Commit phase
442    Commit,
443    /// View change
444    ViewChange,
445    /// New view
446    NewView,
447}
448
449/// PBFT protocol message
450#[derive(Debug, Clone, Serialize, Deserialize)]
451pub struct PbftMessage {
452    /// Message type
453    pub message_type: PbftMessageType,
454    /// Current view number
455    pub view: u64,
456    /// Sequence number
457    pub sequence: u64,
458    /// Message digest
459    pub digest: String,
460    /// Sender node ID
461    pub node_id: String,
462    /// Message data
463    pub data: Vec<u8>,
464    /// Timestamp
465    pub timestamp: SystemTime,
466}
467
468/// Simple majority consensus (for testing/fallback)
469#[derive(Debug)]
470pub struct SimpleMajorityConsensus {
471    /// Node ID
472    node_id: String,
473    /// Known peers
474    peers: HashMap<String, PeerState>,
475    /// Vote history
476    votes: VecDeque<Vote>,
477    /// Configuration
478    config: ConsensusConfig,
479}
480
481impl SimpleMajorityConsensus {
482    /// Create a new simple majority consensus instance
483    pub fn new(node_id: String, peers: Vec<String>, config: ConsensusConfig) -> Self {
484        let mut peer_states = HashMap::new();
485        for peer in peers {
486            peer_states.insert(
487                peer.clone(),
488                PeerState {
489                    id: peer,
490                    last_seen: Instant::now(),
491                    is_healthy: true,
492                    address: None,
493                },
494            );
495        }
496
497        Self {
498            node_id,
499            peers: peer_states,
500            votes: VecDeque::new(),
501            config,
502        }
503    }
504
505    /// Submit a proposal for voting
506    pub fn submit_proposal(&mut self, proposal: Vec<u8>) -> Result<String> {
507        let proposal_id = format!(
508            "proposal_{}_{}",
509            SystemTime::now()
510                .duration_since(std::time::UNIX_EPOCH)
511                .expect("Operation failed")
512                .as_millis(),
513            scirs2_core::random::rng().random::<u64>()
514        );
515
516        let vote = Vote {
517            proposal_id: proposal_id.clone(),
518            proposal_data: proposal,
519            votes_for: 1, // Self vote
520            votes_against: 0,
521            voters: vec![self.node_id.clone()],
522            timestamp: SystemTime::now(),
523        };
524
525        self.votes.push_back(vote);
526
527        // Cleanup old votes
528        while self.votes.len() > 1000 {
529            self.votes.pop_front();
530        }
531
532        // TODO: Send vote request to peers
533
534        Ok(proposal_id)
535    }
536
537    /// Check if proposal has majority
538    pub fn has_majority(&self, proposal_id: &str) -> bool {
539        if let Some(vote) = self.votes.iter().find(|v| v.proposal_id == proposal_id) {
540            let total_nodes = self.peers.len() + 1; // +1 for self
541            vote.votes_for > total_nodes / 2
542        } else {
543            false
544        }
545    }
546}
547
548impl ConsensusManager for SimpleMajorityConsensus {
549    fn start(&mut self) -> Result<()> {
550        // Initialize simple majority consensus
551        self.votes.clear();
552        Ok(())
553    }
554
555    fn propose(&mut self, data: Vec<u8>) -> Result<String> {
556        self.submit_proposal(data)
557    }
558
559    fn get_state(&self) -> ConsensusState {
560        ConsensusState {
561            term: 0,                            // No term concept in simple majority
562            leader: Some(self.node_id.clone()), // Everyone can propose
563            node_state: NodeState::Leader,      // Simplified
564            log_length: self.votes.len(),
565            committed_index: 0, // Simplified
566        }
567    }
568}
569
570/// Vote for simple majority consensus
571#[derive(Debug, Clone)]
572pub struct Vote {
573    /// Proposal ID
574    pub proposal_id: String,
575    /// Proposal data
576    pub proposal_data: Vec<u8>,
577    /// Number of votes for
578    pub votes_for: usize,
579    /// Number of votes against
580    pub votes_against: usize,
581    /// List of voters
582    pub voters: Vec<String>,
583    /// Vote timestamp
584    pub timestamp: SystemTime,
585}
586
587/// Factory for creating consensus instances
588pub struct ConsensusFactory;
589
590impl ConsensusFactory {
591    /// Create a consensus manager based on configuration
592    pub fn create_consensus(
593        algorithm: ConsensusAlgorithm,
594        node_id: String,
595        peers: Vec<String>,
596        config: ConsensusConfig,
597    ) -> Result<Box<dyn ConsensusManager>> {
598        match algorithm {
599            ConsensusAlgorithm::Raft => Ok(Box::new(RaftConsensus::new(node_id, peers, config))),
600            ConsensusAlgorithm::Pbft => Ok(Box::new(PbftConsensus::new(node_id, peers, config))),
601            ConsensusAlgorithm::SimpleMajority => Ok(Box::new(SimpleMajorityConsensus::new(
602                node_id, peers, config,
603            ))),
604            _ => Err(MetricsError::ConsensusError(format!(
605                "Consensus algorithm {:?} not implemented",
606                algorithm
607            ))),
608        }
609    }
610}
611
612#[cfg(test)]
613mod tests {
614    use super::*;
615
616    #[test]
617    fn test_raft_consensus_creation() {
618        let config = ConsensusConfig::default();
619        let peers = vec!["node1".to_string(), "node2".to_string()];
620        let raft = RaftConsensus::new("node0".to_string(), peers, config);
621
622        assert_eq!(raft.current_term(), 0);
623        assert_eq!(*raft.current_state(), NodeState::Follower);
624        assert_eq!(raft.log_length(), 0);
625    }
626
627    #[test]
628    fn test_raft_election() {
629        let config = ConsensusConfig::default();
630        let peers = vec!["node1".to_string(), "node2".to_string()];
631        let mut raft = RaftConsensus::new("node0".to_string(), peers, config);
632
633        raft.start_election().expect("Operation failed");
634        assert_eq!(*raft.current_state(), NodeState::Candidate);
635        assert_eq!(raft.current_term(), 1);
636    }
637
638    #[test]
639    fn test_pbft_consensus_creation() {
640        let config = ConsensusConfig::default();
641        let peers = vec![
642            "node1".to_string(),
643            "node2".to_string(),
644            "node3".to_string(),
645        ];
646        let pbft = PbftConsensus::new("node0".to_string(), peers, config);
647
648        assert!(pbft.has_quorum());
649    }
650
651    #[test]
652    fn test_simple_majority_consensus() {
653        let config = ConsensusConfig::default();
654        let peers = vec!["node1".to_string(), "node2".to_string()];
655        let mut consensus = SimpleMajorityConsensus::new("node0".to_string(), peers, config);
656
657        let proposal_id = consensus
658            .submit_proposal(b"test proposal".to_vec())
659            .expect("Operation failed");
660        assert!(!consensus.has_majority(&proposal_id)); // Need more votes
661    }
662
663    #[test]
664    fn test_consensus_factory() {
665        let config = ConsensusConfig::default();
666        let peers = vec!["node1".to_string()];
667
668        let raft = ConsensusFactory::create_consensus(
669            ConsensusAlgorithm::Raft,
670            "node0".to_string(),
671            peers.clone(),
672            config.clone(),
673        );
674        assert!(raft.is_ok());
675
676        let pbft = ConsensusFactory::create_consensus(
677            ConsensusAlgorithm::Pbft,
678            "node0".to_string(),
679            peers.clone(),
680            config.clone(),
681        );
682        assert!(pbft.is_ok());
683
684        let simple = ConsensusFactory::create_consensus(
685            ConsensusAlgorithm::SimpleMajority,
686            "node0".to_string(),
687            peers,
688            config,
689        );
690        assert!(simple.is_ok());
691    }
692}