1use crate::error::{MetricsError, Result};
10use scirs2_core::random::Rng;
11use serde::{Deserialize, Serialize};
12use std::collections::{HashMap, VecDeque};
13use std::net::SocketAddr;
14use std::sync::{Arc, Mutex};
15use std::time::{Duration, Instant, SystemTime};
16
17pub use super::config::{ConsensusAlgorithm, ConsensusConfig};
18
19pub trait ConsensusManager: Send + Sync {
21 fn start(&mut self) -> Result<()>;
23 fn propose(&mut self, data: Vec<u8>) -> Result<String>;
25 fn get_state(&self) -> ConsensusState;
27}
28
29#[derive(Debug)]
31pub struct RaftConsensus {
32 node_id: String,
34 current_term: u64,
36 voted_for: Option<String>,
38 log: Vec<LogEntry>,
40 state: NodeState,
42 peers: HashMap<String, PeerState>,
44 config: ConsensusConfig,
46 last_heartbeat: Instant,
48 election_timeout: Duration,
50 next_index: HashMap<String, usize>,
52 match_index: HashMap<String, usize>,
54}
55
56impl RaftConsensus {
57 pub fn new(node_id: String, peers: Vec<String>, config: ConsensusConfig) -> Self {
59 let mut peer_states = HashMap::new();
60 for peer in peers {
61 peer_states.insert(
62 peer.clone(),
63 PeerState {
64 id: peer,
65 last_seen: Instant::now(),
66 is_healthy: true,
67 address: None,
68 },
69 );
70 }
71
72 Self {
73 node_id,
74 current_term: 0,
75 voted_for: None,
76 log: vec![],
77 state: NodeState::Follower,
78 peers: peer_states,
79 config,
80 last_heartbeat: Instant::now(),
81 election_timeout: Duration::from_millis(5000),
82 next_index: HashMap::new(),
83 match_index: HashMap::new(),
84 }
85 }
86
87 pub fn start_election(&mut self) -> Result<()> {
89 self.current_term += 1;
90 self.state = NodeState::Candidate;
91 self.voted_for = Some(self.node_id.clone());
92 self.last_heartbeat = Instant::now();
93
94 let base_timeout = self.config.election_timeout_ms;
96 let jitter = scirs2_core::random::rng().random_range(0..base_timeout / 2);
97 self.election_timeout = Duration::from_millis(base_timeout + jitter);
98
99 Ok(())
103 }
104
105 pub fn append_entries(&mut self, entries: Vec<LogEntry>) -> Result<bool> {
107 for entry in entries {
109 self.log.push(entry);
110 }
111 Ok(true)
112 }
113
114 pub fn handle_vote_request(&mut self, term: u64, candidate_id: String) -> Result<bool> {
116 if term > self.current_term {
117 self.current_term = term;
118 self.voted_for = None;
119 self.state = NodeState::Follower;
120 }
121
122 let can_vote = self.voted_for.is_none() || self.voted_for.as_ref() == Some(&candidate_id);
123
124 if term == self.current_term && can_vote {
125 self.voted_for = Some(candidate_id);
126 Ok(true)
127 } else {
128 Ok(false)
129 }
130 }
131
132 pub fn become_leader(&mut self) -> Result<()> {
134 self.state = NodeState::Leader;
135
136 let log_length = self.log.len();
138 for peer_id in self.peers.keys() {
139 self.next_index.insert(peer_id.clone(), log_length);
140 self.match_index.insert(peer_id.clone(), 0);
141 }
142
143 Ok(())
144 }
145
146 pub fn send_heartbeat(&mut self) -> Result<()> {
148 if self.state != NodeState::Leader {
149 return Ok(());
150 }
151
152 self.last_heartbeat = Instant::now();
153
154 Ok(())
158 }
159
160 pub fn is_election_timeout(&self) -> bool {
162 self.last_heartbeat.elapsed() > self.election_timeout
163 }
164
165 pub fn log_length(&self) -> usize {
167 self.log.len()
168 }
169
170 pub fn current_term(&self) -> u64 {
172 self.current_term
173 }
174
175 pub fn current_state(&self) -> &NodeState {
177 &self.state
178 }
179}
180
181impl ConsensusManager for RaftConsensus {
182 fn start(&mut self) -> Result<()> {
183 self.last_heartbeat = Instant::now();
185 Ok(())
186 }
187
188 fn propose(&mut self, data: Vec<u8>) -> Result<String> {
189 if self.state != NodeState::Leader {
190 return Err(MetricsError::ConsensusError(
191 "Only leader can propose entries".to_string(),
192 ));
193 }
194
195 let entry = LogEntry {
196 term: self.current_term,
197 index: self.log.len() as u64,
198 command: Command::UserData(data),
199 timestamp: SystemTime::now(),
200 };
201
202 let entry_id = format!("entry_{}_{}", self.current_term, entry.index);
203 self.log.push(entry);
204
205 Ok(entry_id)
208 }
209
210 fn get_state(&self) -> ConsensusState {
211 ConsensusState {
212 term: self.current_term,
213 leader: if self.state == NodeState::Leader {
214 Some(self.node_id.clone())
215 } else {
216 None
217 },
218 node_state: self.state.clone(),
219 log_length: self.log.len(),
220 committed_index: 0, }
222 }
223}
224
225#[derive(Debug, Clone, Serialize, Deserialize)]
227pub struct ConsensusState {
228 pub term: u64,
230 pub leader: Option<String>,
232 pub node_state: NodeState,
234 pub log_length: usize,
236 pub committed_index: usize,
238}
239
240#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
242pub enum NodeState {
243 Follower,
245 Candidate,
247 Leader,
249}
250
251#[derive(Debug, Clone, Serialize, Deserialize)]
253pub struct LogEntry {
254 pub term: u64,
256 pub index: u64,
258 pub command: Command,
260 pub timestamp: SystemTime,
262}
263
264#[derive(Debug, Clone, Serialize, Deserialize)]
266pub enum Command {
267 NoOp,
269 UserData(Vec<u8>),
271 ConfigChange {
273 change_type: ConfigChangeType,
275 node_id: String,
277 address: Option<SocketAddr>,
279 },
280 Snapshot {
282 last_included_index: u64,
284 last_included_term: u64,
286 data: Vec<u8>,
288 },
289}
290
291#[derive(Debug, Clone, Serialize, Deserialize)]
293pub enum ConfigChangeType {
294 AddNode,
296 RemoveNode,
298 UpdateNode,
300}
301
302#[derive(Debug, Clone)]
304pub struct PeerState {
305 pub id: String,
307 pub last_seen: Instant,
309 pub is_healthy: bool,
311 pub address: Option<SocketAddr>,
313}
314
315#[derive(Debug)]
317pub struct PbftConsensus {
318 node_id: String,
320 current_view: u64,
322 sequence_number: u64,
324 peers: HashMap<String, PeerState>,
326 config: ConsensusConfig,
328 message_log: Vec<PbftMessage>,
330}
331
332impl PbftConsensus {
333 pub fn new(node_id: String, peers: Vec<String>, config: ConsensusConfig) -> Self {
335 let mut peer_states = HashMap::new();
336 for peer in peers {
337 peer_states.insert(
338 peer.clone(),
339 PeerState {
340 id: peer,
341 last_seen: Instant::now(),
342 is_healthy: true,
343 address: None,
344 },
345 );
346 }
347
348 Self {
349 node_id,
350 current_view: 0,
351 sequence_number: 0,
352 peers: peer_states,
353 config,
354 message_log: vec![],
355 }
356 }
357
358 pub fn has_quorum(&self) -> bool {
360 let total_nodes = self.peers.len() + 1; let healthy_nodes = self.peers.values().filter(|p| p.is_healthy).count() + 1;
362
363 healthy_nodes > (total_nodes * 2 / 3)
366 }
367
368 pub fn start_consensus(&mut self, request: Vec<u8>) -> Result<String> {
370 if !self.has_quorum() {
371 return Err(MetricsError::ConsensusError(
372 "Insufficient nodes for consensus".to_string(),
373 ));
374 }
375
376 self.sequence_number += 1;
377 let message = PbftMessage {
378 message_type: PbftMessageType::PrePrepare,
379 view: self.current_view,
380 sequence: self.sequence_number,
381 digest: self.compute_digest(&request),
382 node_id: self.node_id.clone(),
383 data: request,
384 timestamp: SystemTime::now(),
385 };
386
387 self.message_log.push(message.clone());
388
389 Ok(format!(
392 "pbft_{}_{}",
393 self.current_view, self.sequence_number
394 ))
395 }
396
397 fn compute_digest(&self, data: &[u8]) -> String {
398 use std::collections::hash_map::DefaultHasher;
400 use std::hash::{Hash, Hasher};
401
402 let mut hasher = DefaultHasher::new();
403 data.hash(&mut hasher);
404 format!("{:x}", hasher.finish())
405 }
406}
407
408impl ConsensusManager for PbftConsensus {
409 fn start(&mut self) -> Result<()> {
410 self.current_view = 0;
412 self.sequence_number = 0;
413 Ok(())
414 }
415
416 fn propose(&mut self, data: Vec<u8>) -> Result<String> {
417 self.start_consensus(data)
418 }
419
420 fn get_state(&self) -> ConsensusState {
421 ConsensusState {
422 term: self.current_view,
423 leader: Some(format!(
424 "primary_{}",
425 self.current_view % (self.peers.len() + 1) as u64
426 )),
427 node_state: NodeState::Follower, log_length: self.message_log.len(),
429 committed_index: 0, }
431 }
432}
433
434#[derive(Debug, Clone, Serialize, Deserialize)]
436pub enum PbftMessageType {
437 PrePrepare,
439 Prepare,
441 Commit,
443 ViewChange,
445 NewView,
447}
448
449#[derive(Debug, Clone, Serialize, Deserialize)]
451pub struct PbftMessage {
452 pub message_type: PbftMessageType,
454 pub view: u64,
456 pub sequence: u64,
458 pub digest: String,
460 pub node_id: String,
462 pub data: Vec<u8>,
464 pub timestamp: SystemTime,
466}
467
468#[derive(Debug)]
470pub struct SimpleMajorityConsensus {
471 node_id: String,
473 peers: HashMap<String, PeerState>,
475 votes: VecDeque<Vote>,
477 config: ConsensusConfig,
479}
480
481impl SimpleMajorityConsensus {
482 pub fn new(node_id: String, peers: Vec<String>, config: ConsensusConfig) -> Self {
484 let mut peer_states = HashMap::new();
485 for peer in peers {
486 peer_states.insert(
487 peer.clone(),
488 PeerState {
489 id: peer,
490 last_seen: Instant::now(),
491 is_healthy: true,
492 address: None,
493 },
494 );
495 }
496
497 Self {
498 node_id,
499 peers: peer_states,
500 votes: VecDeque::new(),
501 config,
502 }
503 }
504
505 pub fn submit_proposal(&mut self, proposal: Vec<u8>) -> Result<String> {
507 let proposal_id = format!(
508 "proposal_{}_{}",
509 SystemTime::now()
510 .duration_since(std::time::UNIX_EPOCH)
511 .expect("Operation failed")
512 .as_millis(),
513 scirs2_core::random::rng().random::<u64>()
514 );
515
516 let vote = Vote {
517 proposal_id: proposal_id.clone(),
518 proposal_data: proposal,
519 votes_for: 1, votes_against: 0,
521 voters: vec![self.node_id.clone()],
522 timestamp: SystemTime::now(),
523 };
524
525 self.votes.push_back(vote);
526
527 while self.votes.len() > 1000 {
529 self.votes.pop_front();
530 }
531
532 Ok(proposal_id)
535 }
536
537 pub fn has_majority(&self, proposal_id: &str) -> bool {
539 if let Some(vote) = self.votes.iter().find(|v| v.proposal_id == proposal_id) {
540 let total_nodes = self.peers.len() + 1; vote.votes_for > total_nodes / 2
542 } else {
543 false
544 }
545 }
546}
547
548impl ConsensusManager for SimpleMajorityConsensus {
549 fn start(&mut self) -> Result<()> {
550 self.votes.clear();
552 Ok(())
553 }
554
555 fn propose(&mut self, data: Vec<u8>) -> Result<String> {
556 self.submit_proposal(data)
557 }
558
559 fn get_state(&self) -> ConsensusState {
560 ConsensusState {
561 term: 0, leader: Some(self.node_id.clone()), node_state: NodeState::Leader, log_length: self.votes.len(),
565 committed_index: 0, }
567 }
568}
569
570#[derive(Debug, Clone)]
572pub struct Vote {
573 pub proposal_id: String,
575 pub proposal_data: Vec<u8>,
577 pub votes_for: usize,
579 pub votes_against: usize,
581 pub voters: Vec<String>,
583 pub timestamp: SystemTime,
585}
586
587pub struct ConsensusFactory;
589
590impl ConsensusFactory {
591 pub fn create_consensus(
593 algorithm: ConsensusAlgorithm,
594 node_id: String,
595 peers: Vec<String>,
596 config: ConsensusConfig,
597 ) -> Result<Box<dyn ConsensusManager>> {
598 match algorithm {
599 ConsensusAlgorithm::Raft => Ok(Box::new(RaftConsensus::new(node_id, peers, config))),
600 ConsensusAlgorithm::Pbft => Ok(Box::new(PbftConsensus::new(node_id, peers, config))),
601 ConsensusAlgorithm::SimpleMajority => Ok(Box::new(SimpleMajorityConsensus::new(
602 node_id, peers, config,
603 ))),
604 _ => Err(MetricsError::ConsensusError(format!(
605 "Consensus algorithm {:?} not implemented",
606 algorithm
607 ))),
608 }
609 }
610}
611
612#[cfg(test)]
613mod tests {
614 use super::*;
615
616 #[test]
617 fn test_raft_consensus_creation() {
618 let config = ConsensusConfig::default();
619 let peers = vec!["node1".to_string(), "node2".to_string()];
620 let raft = RaftConsensus::new("node0".to_string(), peers, config);
621
622 assert_eq!(raft.current_term(), 0);
623 assert_eq!(*raft.current_state(), NodeState::Follower);
624 assert_eq!(raft.log_length(), 0);
625 }
626
627 #[test]
628 fn test_raft_election() {
629 let config = ConsensusConfig::default();
630 let peers = vec!["node1".to_string(), "node2".to_string()];
631 let mut raft = RaftConsensus::new("node0".to_string(), peers, config);
632
633 raft.start_election().expect("Operation failed");
634 assert_eq!(*raft.current_state(), NodeState::Candidate);
635 assert_eq!(raft.current_term(), 1);
636 }
637
638 #[test]
639 fn test_pbft_consensus_creation() {
640 let config = ConsensusConfig::default();
641 let peers = vec![
642 "node1".to_string(),
643 "node2".to_string(),
644 "node3".to_string(),
645 ];
646 let pbft = PbftConsensus::new("node0".to_string(), peers, config);
647
648 assert!(pbft.has_quorum());
649 }
650
651 #[test]
652 fn test_simple_majority_consensus() {
653 let config = ConsensusConfig::default();
654 let peers = vec!["node1".to_string(), "node2".to_string()];
655 let mut consensus = SimpleMajorityConsensus::new("node0".to_string(), peers, config);
656
657 let proposal_id = consensus
658 .submit_proposal(b"test proposal".to_vec())
659 .expect("Operation failed");
660 assert!(!consensus.has_majority(&proposal_id)); }
662
663 #[test]
664 fn test_consensus_factory() {
665 let config = ConsensusConfig::default();
666 let peers = vec!["node1".to_string()];
667
668 let raft = ConsensusFactory::create_consensus(
669 ConsensusAlgorithm::Raft,
670 "node0".to_string(),
671 peers.clone(),
672 config.clone(),
673 );
674 assert!(raft.is_ok());
675
676 let pbft = ConsensusFactory::create_consensus(
677 ConsensusAlgorithm::Pbft,
678 "node0".to_string(),
679 peers.clone(),
680 config.clone(),
681 );
682 assert!(pbft.is_ok());
683
684 let simple = ConsensusFactory::create_consensus(
685 ConsensusAlgorithm::SimpleMajority,
686 "node0".to_string(),
687 peers,
688 config,
689 );
690 assert!(simple.is_ok());
691 }
692}