1use crate::error::{MetricsError, Result};
10use crate::optimization::distributed::transport::{ConsensusMessage, Transport};
11use scirs2_core::random::{Rng, RngExt};
12use serde::{Deserialize, Serialize};
13use std::collections::{HashMap, VecDeque};
14use std::net::SocketAddr;
15use std::sync::Arc;
16use std::time::{Duration, Instant, SystemTime};
17
18pub use super::config::{ConsensusAlgorithm, ConsensusConfig};
19
20pub trait ConsensusManager: Send + Sync {
22 fn start(&mut self) -> Result<()>;
24 fn propose(&mut self, data: Vec<u8>) -> Result<String>;
26 fn get_state(&self) -> ConsensusState;
28}
29
30pub struct RaftConsensus {
32 node_id: String,
34 current_term: u64,
36 voted_for: Option<String>,
38 log: Vec<LogEntry>,
40 state: NodeState,
42 peers: HashMap<String, PeerState>,
44 config: ConsensusConfig,
46 last_heartbeat: Instant,
48 election_timeout: Duration,
50 next_index: HashMap<String, usize>,
52 match_index: HashMap<String, usize>,
54 commit_index: u64,
56 transport: Option<Arc<dyn Transport>>,
58}
59
60impl std::fmt::Debug for RaftConsensus {
61 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
62 f.debug_struct("RaftConsensus")
63 .field("node_id", &self.node_id)
64 .field("current_term", &self.current_term)
65 .field("state", &self.state)
66 .finish()
67 }
68}
69
70impl RaftConsensus {
71 pub fn new(node_id: String, peers: Vec<String>, config: ConsensusConfig) -> Self {
73 let mut peer_states = HashMap::new();
74 for peer in peers {
75 peer_states.insert(
76 peer.clone(),
77 PeerState {
78 id: peer,
79 last_seen: Instant::now(),
80 is_healthy: true,
81 address: None,
82 },
83 );
84 }
85
86 Self {
87 node_id,
88 current_term: 0,
89 voted_for: None,
90 log: vec![],
91 state: NodeState::Follower,
92 peers: peer_states,
93 config,
94 last_heartbeat: Instant::now(),
95 election_timeout: Duration::from_millis(5000),
96 next_index: HashMap::new(),
97 match_index: HashMap::new(),
98 commit_index: 0,
99 transport: None,
100 }
101 }
102
103 pub fn set_transport(&mut self, t: Arc<dyn Transport>) {
105 self.transport = Some(t);
106 }
107
108 pub fn poll_messages(&mut self) -> Result<()> {
110 let transport = match self.transport.clone() {
112 Some(t) => t,
113 None => return Ok(()),
114 };
115
116 let max_msgs = 256;
118 for _ in 0..max_msgs {
119 match transport.try_recv() {
120 None => break,
121 Some((from, msg)) => self.handle_message(from, msg)?,
122 }
123 }
124 Ok(())
125 }
126
127 fn handle_message(&mut self, from: String, msg: ConsensusMessage) -> Result<()> {
129 match msg {
130 ConsensusMessage::RequestVote {
131 term,
132 candidate_id,
133 last_log_index: _,
134 last_log_term: _,
135 } => {
136 let granted = self.handle_vote_request(term, candidate_id)?;
137 if let Some(ref transport) = self.transport.clone() {
138 let _ = transport.send(
139 &from,
140 ConsensusMessage::RequestVoteResponse {
141 term: self.current_term,
142 granted,
143 },
144 );
145 }
146 }
147 ConsensusMessage::AppendEntries {
148 term,
149 leader_id,
150 prev_log_index,
151 prev_log_term: _,
152 entries,
153 leader_commit,
154 } => {
155 let success =
156 self.handle_append_entries(term, &leader_id, entries, leader_commit)?;
157 let match_index = prev_log_index + if success { 1 } else { 0 };
158 if let Some(ref transport) = self.transport.clone() {
159 let _ = transport.send(
160 &from,
161 ConsensusMessage::AppendEntriesResponse {
162 term: self.current_term,
163 success,
164 match_index,
165 },
166 );
167 }
168 }
169 ConsensusMessage::RequestVoteResponse { term, granted } => {
170 if term > self.current_term {
171 self.current_term = term;
172 self.state = NodeState::Follower;
173 self.voted_for = None;
174 }
175 if granted && self.state == NodeState::Candidate {
178 }
181 }
182 ConsensusMessage::AppendEntriesResponse {
183 term,
184 success,
185 match_index,
186 } => {
187 if term > self.current_term {
188 self.current_term = term;
189 self.state = NodeState::Follower;
190 }
191 if success && match_index > self.commit_index {
192 self.commit_index = match_index;
193 }
194 }
195 _ => {} }
197 Ok(())
198 }
199
200 pub fn handle_append_entries(
202 &mut self,
203 term: u64,
204 leader_id: &str,
205 entries: Vec<Vec<u8>>,
206 leader_commit: u64,
207 ) -> Result<bool> {
208 if term < self.current_term {
209 return Ok(false);
210 }
211 self.current_term = term;
213 self.state = NodeState::Follower;
214 self.last_heartbeat = Instant::now();
215 self.voted_for = Some(leader_id.to_string());
216
217 for raw in entries {
219 let entry = LogEntry {
220 term,
221 index: self.log.len() as u64 + 1,
222 command: Command::UserData(raw),
223 timestamp: SystemTime::now(),
224 };
225 self.log.push(entry);
226 }
227
228 if leader_commit > self.commit_index {
230 self.commit_index = leader_commit.min(self.log.len() as u64);
231 }
232
233 Ok(true)
234 }
235
236 pub fn start_election(&mut self) -> Result<()> {
238 self.current_term += 1;
239 self.state = NodeState::Candidate;
240 self.voted_for = Some(self.node_id.clone());
241 self.last_heartbeat = Instant::now();
242
243 let base_timeout = self.config.election_timeout_ms;
245 let jitter = scirs2_core::random::rng().random_range(0..base_timeout / 2);
246 self.election_timeout = Duration::from_millis(base_timeout + jitter);
247
248 let transport = self.transport.clone();
250 if let Some(ref transport) = transport {
251 let peers = transport.peer_ids();
252 let last_log_index = self.log.len() as u64;
253 let last_log_term = self.log.last().map(|e| e.term).unwrap_or(0);
254
255 let msg = ConsensusMessage::RequestVote {
256 term: self.current_term,
257 candidate_id: self.node_id.clone(),
258 last_log_index,
259 last_log_term,
260 };
261 let _ = transport.broadcast(msg);
262
263 let mut votes = 1u64; let total = peers.len() as u64 + 1;
266
267 for _ in 0..peers.len() {
268 if let Some((_from, inner_msg)) = transport.try_recv() {
269 if let ConsensusMessage::RequestVoteResponse { term, granted } = inner_msg {
270 if term > self.current_term {
271 self.current_term = term;
272 self.state = NodeState::Follower;
273 return Ok(());
274 }
275 if granted {
276 votes += 1;
277 }
278 }
279 }
280 }
281
282 if votes > total / 2 {
283 self.become_leader()?;
284 }
285 }
286
287 Ok(())
288 }
289
290 pub fn append_entries(&mut self, entries: Vec<LogEntry>) -> Result<bool> {
292 for entry in entries {
293 self.log.push(entry);
294 }
295 Ok(true)
296 }
297
298 pub fn handle_vote_request(&mut self, term: u64, candidate_id: String) -> Result<bool> {
300 if term > self.current_term {
301 self.current_term = term;
302 self.voted_for = None;
303 self.state = NodeState::Follower;
304 }
305
306 let can_vote = self.voted_for.is_none() || self.voted_for.as_ref() == Some(&candidate_id);
307
308 if term == self.current_term && can_vote {
309 self.voted_for = Some(candidate_id);
310 Ok(true)
311 } else {
312 Ok(false)
313 }
314 }
315
316 pub fn become_leader(&mut self) -> Result<()> {
318 self.state = NodeState::Leader;
319
320 let log_length = self.log.len();
322 for peer_id in self.peers.keys().cloned().collect::<Vec<_>>() {
323 self.next_index.insert(peer_id.clone(), log_length);
324 self.match_index.insert(peer_id, 0);
325 }
326
327 Ok(())
328 }
329
330 pub fn send_heartbeat(&mut self) -> Result<()> {
332 if self.state != NodeState::Leader {
333 return Ok(());
334 }
335
336 self.last_heartbeat = Instant::now();
337
338 let transport = self.transport.clone();
339 if let Some(ref transport) = transport {
340 let msg = ConsensusMessage::AppendEntries {
341 term: self.current_term,
342 leader_id: self.node_id.clone(),
343 prev_log_index: self.commit_index,
344 prev_log_term: self
345 .log
346 .get(self.commit_index as usize)
347 .map(|e| e.term)
348 .unwrap_or(0),
349 entries: vec![],
350 leader_commit: self.commit_index,
351 };
352 let _ = transport.broadcast(msg);
353 }
354
355 Ok(())
356 }
357
358 pub fn is_election_timeout(&self) -> bool {
360 self.last_heartbeat.elapsed() > self.election_timeout
361 }
362
363 pub fn log_length(&self) -> usize {
365 self.log.len()
366 }
367
368 pub fn current_term(&self) -> u64 {
370 self.current_term
371 }
372
373 pub fn current_state(&self) -> &NodeState {
375 &self.state
376 }
377
378 pub fn commit_index(&self) -> u64 {
380 self.commit_index
381 }
382}
383
384impl ConsensusManager for RaftConsensus {
385 fn start(&mut self) -> Result<()> {
386 self.last_heartbeat = Instant::now();
387 Ok(())
388 }
389
390 fn propose(&mut self, data: Vec<u8>) -> Result<String> {
391 if self.state != NodeState::Leader {
392 return Err(MetricsError::ConsensusError(
393 "Only leader can propose entries".to_string(),
394 ));
395 }
396
397 let entry = LogEntry {
398 term: self.current_term,
399 index: self.log.len() as u64 + 1,
400 command: Command::UserData(data),
401 timestamp: SystemTime::now(),
402 };
403
404 let entry_id = format!("entry_{}_{}", self.current_term, entry.index);
405 self.log.push(entry.clone());
406
407 let transport = self.transport.clone();
409 if let Some(ref transport) = transport {
410 let entry_bytes = serde_json::to_vec(&entry.command).unwrap_or_default();
411 let prev_log_index = self.log.len() as u64 - 1;
412 let prev_log_term = if self.log.len() > 1 {
413 self.log[self.log.len() - 2].term
414 } else {
415 0
416 };
417
418 let msg = ConsensusMessage::AppendEntries {
419 term: self.current_term,
420 leader_id: self.node_id.clone(),
421 prev_log_index,
422 prev_log_term,
423 entries: vec![entry_bytes],
424 leader_commit: self.commit_index,
425 };
426 let _ = transport.broadcast(msg);
427
428 let peers = transport.peer_ids();
430 let total = peers.len() as u64 + 1;
431 let mut acks = 1u64; for _ in 0..peers.len() {
434 if let Some((_from, resp)) = transport.try_recv() {
435 if let ConsensusMessage::AppendEntriesResponse {
436 success,
437 match_index,
438 ..
439 } = resp
440 {
441 if success {
442 acks += 1;
443 if match_index > self.commit_index {
444 self.commit_index = match_index;
445 }
446 }
447 }
448 }
449 }
450
451 if acks > total / 2 {
452 self.commit_index = self.log.len() as u64;
453 }
454 }
455
456 Ok(entry_id)
457 }
458
459 fn get_state(&self) -> ConsensusState {
460 ConsensusState {
461 term: self.current_term,
462 leader: if self.state == NodeState::Leader {
463 Some(self.node_id.clone())
464 } else {
465 None
466 },
467 node_state: self.state.clone(),
468 log_length: self.log.len(),
469 committed_index: self.commit_index as usize,
470 }
471 }
472}
473
474#[derive(Debug, Clone, Serialize, Deserialize)]
476pub struct ConsensusState {
477 pub term: u64,
479 pub leader: Option<String>,
481 pub node_state: NodeState,
483 pub log_length: usize,
485 pub committed_index: usize,
487}
488
489#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
491pub enum NodeState {
492 Follower,
494 Candidate,
496 Leader,
498}
499
500#[derive(Debug, Clone, Serialize, Deserialize)]
502pub struct LogEntry {
503 pub term: u64,
505 pub index: u64,
507 pub command: Command,
509 pub timestamp: SystemTime,
511}
512
513#[derive(Debug, Clone, Serialize, Deserialize)]
515pub enum Command {
516 NoOp,
518 UserData(Vec<u8>),
520 ConfigChange {
522 change_type: ConfigChangeType,
524 node_id: String,
526 address: Option<SocketAddr>,
528 },
529 Snapshot {
531 last_included_index: u64,
533 last_included_term: u64,
535 data: Vec<u8>,
537 },
538}
539
540#[derive(Debug, Clone, Serialize, Deserialize)]
542pub enum ConfigChangeType {
543 AddNode,
545 RemoveNode,
547 UpdateNode,
549}
550
551#[derive(Debug, Clone)]
553pub struct PeerState {
554 pub id: String,
556 pub last_seen: Instant,
558 pub is_healthy: bool,
560 pub address: Option<SocketAddr>,
562}
563
564pub struct PbftConsensus {
570 node_id: String,
572 current_view: u64,
574 sequence_number: u64,
576 peers: HashMap<String, PeerState>,
578 config: ConsensusConfig,
580 message_log: Vec<PbftMessage>,
582 transport: Option<Arc<dyn Transport>>,
584}
585
586impl std::fmt::Debug for PbftConsensus {
587 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
588 f.debug_struct("PbftConsensus")
589 .field("node_id", &self.node_id)
590 .field("current_view", &self.current_view)
591 .finish()
592 }
593}
594
595impl PbftConsensus {
596 pub fn new(node_id: String, peers: Vec<String>, config: ConsensusConfig) -> Self {
598 let mut peer_states = HashMap::new();
599 for peer in peers {
600 peer_states.insert(
601 peer.clone(),
602 PeerState {
603 id: peer,
604 last_seen: Instant::now(),
605 is_healthy: true,
606 address: None,
607 },
608 );
609 }
610
611 Self {
612 node_id,
613 current_view: 0,
614 sequence_number: 0,
615 peers: peer_states,
616 config,
617 message_log: vec![],
618 transport: None,
619 }
620 }
621
622 pub fn set_transport(&mut self, t: Arc<dyn Transport>) {
624 self.transport = Some(t);
625 }
626
627 pub fn has_quorum(&self) -> bool {
629 let total_nodes = self.peers.len() + 1; let healthy_nodes = self.peers.values().filter(|p| p.is_healthy).count() + 1;
631
632 healthy_nodes > (total_nodes * 2 / 3)
635 }
636
637 fn f(&self) -> usize {
639 let n = self.peers.len() + 1;
641 (n - 1) / 3
642 }
643
644 pub fn start_consensus(&mut self, request: Vec<u8>) -> Result<String> {
646 if !self.has_quorum() {
647 return Err(MetricsError::ConsensusError(
648 "Insufficient nodes for consensus".to_string(),
649 ));
650 }
651
652 self.sequence_number += 1;
653 let digest = self.compute_digest(&request);
654 let seq = self.sequence_number;
655 let view = self.current_view;
656
657 let pre_prepare = PbftMessage {
659 message_type: PbftMessageType::PrePrepare,
660 view,
661 sequence: seq,
662 digest: digest.clone(),
663 node_id: self.node_id.clone(),
664 data: request.clone(),
665 timestamp: SystemTime::now(),
666 };
667 self.message_log.push(pre_prepare.clone());
668
669 let transport = self.transport.clone();
670 if let Some(ref transport) = transport {
671 let _ = transport.broadcast(ConsensusMessage::PbftPrePrepare {
672 view,
673 sequence: seq,
674 digest: digest.clone(),
675 data: request,
676 node_id: self.node_id.clone(),
677 });
678
679 let quorum = 2 * self.f() + 1;
681 let peers_count = self.peers.len();
682 let mut prepares = 1usize; for _ in 0..peers_count {
685 if let Some((_from, msg)) = transport.try_recv() {
686 if let ConsensusMessage::PbftPrepare {
687 sequence: s,
688 digest: d,
689 ..
690 } = &msg
691 {
692 if *s == seq && *d == digest {
693 prepares += 1;
694 let prepare_msg = PbftMessage {
695 message_type: PbftMessageType::Prepare,
696 view,
697 sequence: seq,
698 digest: digest.clone(),
699 node_id: self.node_id.clone(),
700 data: vec![],
701 timestamp: SystemTime::now(),
702 };
703 self.message_log.push(prepare_msg);
704 }
705 }
706 }
707 }
708
709 if prepares >= quorum {
711 let _ = transport.broadcast(ConsensusMessage::PbftCommit {
712 view,
713 sequence: seq,
714 digest: digest.clone(),
715 node_id: self.node_id.clone(),
716 });
717
718 let mut commits = 1usize;
720 for _ in 0..peers_count {
721 if let Some((_from, msg)) = transport.try_recv() {
722 if let ConsensusMessage::PbftCommit {
723 sequence: s,
724 digest: d,
725 ..
726 } = &msg
727 {
728 if *s == seq && *d == digest {
729 commits += 1;
730 let commit_msg = PbftMessage {
731 message_type: PbftMessageType::Commit,
732 view,
733 sequence: seq,
734 digest: digest.clone(),
735 node_id: self.node_id.clone(),
736 data: vec![],
737 timestamp: SystemTime::now(),
738 };
739 self.message_log.push(commit_msg);
740 }
741 }
742 }
743 }
744
745 if commits < quorum {
746 return Err(MetricsError::ConsensusError(
747 "PBFT commit quorum not reached".to_string(),
748 ));
749 }
750 } else {
751 return Err(MetricsError::ConsensusError(
752 "PBFT prepare quorum not reached".to_string(),
753 ));
754 }
755 }
756
757 Ok(format!("pbft_{}_{}", view, seq))
758 }
759
760 fn compute_digest(&self, data: &[u8]) -> String {
761 use std::collections::hash_map::DefaultHasher;
763 use std::hash::{Hash, Hasher};
764
765 let mut hasher = DefaultHasher::new();
766 data.hash(&mut hasher);
767 format!("{:x}", hasher.finish())
768 }
769}
770
771impl ConsensusManager for PbftConsensus {
772 fn start(&mut self) -> Result<()> {
773 self.current_view = 0;
774 self.sequence_number = 0;
775 Ok(())
776 }
777
778 fn propose(&mut self, data: Vec<u8>) -> Result<String> {
779 self.start_consensus(data)
780 }
781
782 fn get_state(&self) -> ConsensusState {
783 ConsensusState {
784 term: self.current_view,
785 leader: Some(format!(
786 "primary_{}",
787 self.current_view % (self.peers.len() + 1) as u64
788 )),
789 node_state: NodeState::Follower, log_length: self.message_log.len(),
791 committed_index: 0,
792 }
793 }
794}
795
796#[derive(Debug, Clone, Serialize, Deserialize)]
798pub enum PbftMessageType {
799 PrePrepare,
801 Prepare,
803 Commit,
805 ViewChange,
807 NewView,
809}
810
811#[derive(Debug, Clone, Serialize, Deserialize)]
813pub struct PbftMessage {
814 pub message_type: PbftMessageType,
816 pub view: u64,
818 pub sequence: u64,
820 pub digest: String,
822 pub node_id: String,
824 pub data: Vec<u8>,
826 pub timestamp: SystemTime,
828}
829
830pub struct SimpleMajorityConsensus {
836 node_id: String,
838 peers: HashMap<String, PeerState>,
840 votes: VecDeque<Vote>,
842 config: ConsensusConfig,
844 transport: Option<Arc<dyn Transport>>,
846}
847
848impl std::fmt::Debug for SimpleMajorityConsensus {
849 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
850 f.debug_struct("SimpleMajorityConsensus")
851 .field("node_id", &self.node_id)
852 .finish()
853 }
854}
855
856impl SimpleMajorityConsensus {
857 pub fn new(node_id: String, peers: Vec<String>, config: ConsensusConfig) -> Self {
859 let mut peer_states = HashMap::new();
860 for peer in peers {
861 peer_states.insert(
862 peer.clone(),
863 PeerState {
864 id: peer,
865 last_seen: Instant::now(),
866 is_healthy: true,
867 address: None,
868 },
869 );
870 }
871
872 Self {
873 node_id,
874 peers: peer_states,
875 votes: VecDeque::new(),
876 config,
877 transport: None,
878 }
879 }
880
881 pub fn set_transport(&mut self, t: Arc<dyn Transport>) {
883 self.transport = Some(t);
884 }
885
886 pub fn submit_proposal(&mut self, proposal: Vec<u8>) -> Result<String> {
888 let proposal_id = format!(
889 "proposal_{}_{}",
890 SystemTime::now()
891 .duration_since(std::time::UNIX_EPOCH)
892 .map(|d| d.as_millis())
893 .unwrap_or(0),
894 scirs2_core::random::rng().random::<u64>()
895 );
896
897 let mut vote = Vote {
898 proposal_id: proposal_id.clone(),
899 proposal_data: proposal,
900 votes_for: 1, votes_against: 0,
902 voters: vec![self.node_id.clone()],
903 timestamp: SystemTime::now(),
904 };
905
906 let transport = self.transport.clone();
908 if let Some(ref transport) = transport {
909 let _ = transport.broadcast(ConsensusMessage::RequestVote {
910 term: 0,
911 candidate_id: proposal_id.clone(),
912 last_log_index: 0,
913 last_log_term: 0,
914 });
915
916 let peers_count = self.peers.len();
918 for _ in 0..peers_count {
919 if let Some((_from, msg)) = transport.try_recv() {
920 if let ConsensusMessage::RequestVoteResponse { granted, .. } = msg {
921 if granted {
922 vote.votes_for += 1;
923 } else {
924 vote.votes_against += 1;
925 }
926 }
927 }
928 }
929 }
930
931 self.votes.push_back(vote);
932
933 while self.votes.len() > 1000 {
935 self.votes.pop_front();
936 }
937
938 Ok(proposal_id)
939 }
940
941 pub fn has_majority(&self, proposal_id: &str) -> bool {
943 if let Some(vote) = self.votes.iter().find(|v| v.proposal_id == proposal_id) {
944 let total_nodes = self.peers.len() + 1; vote.votes_for > total_nodes / 2
946 } else {
947 false
948 }
949 }
950}
951
952impl ConsensusManager for SimpleMajorityConsensus {
953 fn start(&mut self) -> Result<()> {
954 self.votes.clear();
955 Ok(())
956 }
957
958 fn propose(&mut self, data: Vec<u8>) -> Result<String> {
959 self.submit_proposal(data)
960 }
961
962 fn get_state(&self) -> ConsensusState {
963 ConsensusState {
964 term: 0,
965 leader: Some(self.node_id.clone()),
966 node_state: NodeState::Leader,
967 log_length: self.votes.len(),
968 committed_index: 0,
969 }
970 }
971}
972
973#[derive(Debug, Clone)]
975pub struct Vote {
976 pub proposal_id: String,
978 pub proposal_data: Vec<u8>,
980 pub votes_for: usize,
982 pub votes_against: usize,
984 pub voters: Vec<String>,
986 pub timestamp: SystemTime,
988}
989
990pub struct ConsensusFactory;
996
997impl ConsensusFactory {
998 pub fn create_consensus(
1000 algorithm: ConsensusAlgorithm,
1001 node_id: String,
1002 peers: Vec<String>,
1003 config: ConsensusConfig,
1004 ) -> Result<Box<dyn ConsensusManager>> {
1005 match algorithm {
1006 ConsensusAlgorithm::Raft => Ok(Box::new(RaftConsensus::new(node_id, peers, config))),
1007 ConsensusAlgorithm::Pbft => Ok(Box::new(PbftConsensus::new(node_id, peers, config))),
1008 ConsensusAlgorithm::SimpleMajority => Ok(Box::new(SimpleMajorityConsensus::new(
1009 node_id, peers, config,
1010 ))),
1011 _ => Err(MetricsError::ConsensusError(format!(
1012 "Consensus algorithm {:?} not implemented",
1013 algorithm
1014 ))),
1015 }
1016 }
1017}
1018
1019#[cfg(test)]
1024mod tests {
1025 use super::*;
1026 use crate::optimization::distributed::transport::InMemoryTransport;
1027
1028 #[test]
1031 fn test_raft_consensus_creation() {
1032 let config = ConsensusConfig::default();
1033 let peers = vec!["node1".to_string(), "node2".to_string()];
1034 let raft = RaftConsensus::new("node0".to_string(), peers, config);
1035
1036 assert_eq!(raft.current_term(), 0);
1037 assert_eq!(*raft.current_state(), NodeState::Follower);
1038 assert_eq!(raft.log_length(), 0);
1039 }
1040
1041 #[test]
1042 fn test_raft_election_without_transport() {
1043 let config = ConsensusConfig::default();
1044 let peers = vec!["node1".to_string(), "node2".to_string()];
1045 let mut raft = RaftConsensus::new("node0".to_string(), peers, config);
1046
1047 raft.start_election().expect("election should not fail");
1048 assert_eq!(*raft.current_state(), NodeState::Candidate);
1049 assert_eq!(raft.current_term(), 1);
1050 }
1051
1052 #[test]
1053 fn test_pbft_consensus_creation() {
1054 let config = ConsensusConfig::default();
1055 let peers = vec![
1056 "node1".to_string(),
1057 "node2".to_string(),
1058 "node3".to_string(),
1059 ];
1060 let pbft = PbftConsensus::new("node0".to_string(), peers, config);
1061
1062 assert!(pbft.has_quorum());
1063 }
1064
1065 #[test]
1066 fn test_simple_majority_consensus() {
1067 let config = ConsensusConfig::default();
1068 let peers = vec!["node1".to_string(), "node2".to_string()];
1069 let mut consensus = SimpleMajorityConsensus::new("node0".to_string(), peers, config);
1070
1071 let proposal_id = consensus
1072 .submit_proposal(b"test proposal".to_vec())
1073 .expect("submit should succeed");
1074 assert!(!consensus.has_majority(&proposal_id));
1076 }
1077
1078 #[test]
1079 fn test_consensus_factory() {
1080 let config = ConsensusConfig::default();
1081 let peers = vec!["node1".to_string()];
1082
1083 let raft = ConsensusFactory::create_consensus(
1084 ConsensusAlgorithm::Raft,
1085 "node0".to_string(),
1086 peers.clone(),
1087 config.clone(),
1088 );
1089 assert!(raft.is_ok());
1090
1091 let pbft = ConsensusFactory::create_consensus(
1092 ConsensusAlgorithm::Pbft,
1093 "node0".to_string(),
1094 peers.clone(),
1095 config.clone(),
1096 );
1097 assert!(pbft.is_ok());
1098
1099 let simple = ConsensusFactory::create_consensus(
1100 ConsensusAlgorithm::SimpleMajority,
1101 "node0".to_string(),
1102 peers,
1103 config,
1104 );
1105 assert!(simple.is_ok());
1106 }
1107
1108 #[test]
1113 fn test_raft_leader_election_in_memory() {
1114 let node_ids = ["n0", "n1", "n2"];
1115 let mut transports = InMemoryTransport::create_network(&node_ids);
1116
1117 let config = ConsensusConfig::default();
1119 let peers_for_n0 = vec!["n1".to_string(), "n2".to_string()];
1120 let mut node0 = RaftConsensus::new("n0".to_string(), peers_for_n0, config.clone());
1121
1122 let (_, t0) = transports.remove(0);
1124 node0.set_transport(Arc::new(t0));
1125
1126 let (_, t1) = transports.remove(0);
1131 let (_, t2) = transports.remove(0);
1132
1133 t1.send(
1135 "n0",
1136 ConsensusMessage::RequestVoteResponse {
1137 term: 1,
1138 granted: true,
1139 },
1140 )
1141 .expect("send should succeed");
1142 t2.send(
1143 "n0",
1144 ConsensusMessage::RequestVoteResponse {
1145 term: 1,
1146 granted: true,
1147 },
1148 )
1149 .expect("send should succeed");
1150
1151 node0.start_election().expect("election should succeed");
1153
1154 assert_eq!(
1156 *node0.current_state(),
1157 NodeState::Leader,
1158 "node 0 should be leader after majority votes"
1159 );
1160 assert_eq!(node0.current_term(), 1);
1161 }
1162
1163 #[test]
1165 fn test_raft_log_replication() {
1166 let node_ids = ["n0", "n1", "n2"];
1167 let mut transports = InMemoryTransport::create_network(&node_ids);
1168
1169 let config = ConsensusConfig::default();
1170
1171 let mut node1 = RaftConsensus::new(
1173 "n1".to_string(),
1174 vec!["n0".to_string(), "n2".to_string()],
1175 config.clone(),
1176 );
1177 let mut node2 = RaftConsensus::new(
1178 "n2".to_string(),
1179 vec!["n0".to_string(), "n1".to_string()],
1180 config.clone(),
1181 );
1182
1183 let (_, t0) = transports.remove(0);
1184 let (_, t1) = transports.remove(0);
1185 let (_, t2) = transports.remove(0);
1186
1187 let t1_arc: Arc<dyn Transport> = Arc::new(t1);
1189 let t2_arc: Arc<dyn Transport> = Arc::new(t2);
1190 node1.set_transport(Arc::clone(&t1_arc));
1191 node2.set_transport(Arc::clone(&t2_arc));
1192
1193 let mut node0 = RaftConsensus::new(
1195 "n0".to_string(),
1196 vec!["n1".to_string(), "n2".to_string()],
1197 config.clone(),
1198 );
1199 node0.set_transport(Arc::new(t0));
1200
1201 node0.become_leader().expect("become_leader should succeed");
1203
1204 t1_arc
1206 .send(
1207 "n0",
1208 ConsensusMessage::AppendEntriesResponse {
1209 term: 0,
1210 success: true,
1211 match_index: 1,
1212 },
1213 )
1214 .expect("send ack n1->n0");
1215 t2_arc
1216 .send(
1217 "n0",
1218 ConsensusMessage::AppendEntriesResponse {
1219 term: 0,
1220 success: true,
1221 match_index: 1,
1222 },
1223 )
1224 .expect("send ack n2->n0");
1225
1226 let entry_id = node0
1228 .propose(b"hello world".to_vec())
1229 .expect("propose should succeed");
1230 assert!(!entry_id.is_empty());
1231
1232 node1.poll_messages().expect("poll n1");
1234 node2.poll_messages().expect("poll n2");
1235
1236 assert_eq!(node1.log_length(), 1, "follower n1 should have 1 log entry");
1238 assert_eq!(node2.log_length(), 1, "follower n2 should have 1 log entry");
1239 }
1240
1241 #[test]
1247 fn test_pbft_consensus_three_nodes() {
1248 let node_ids = ["p0", "r1", "r2"];
1249 let transports = InMemoryTransport::create_network(&node_ids);
1250
1251 let config = ConsensusConfig::default();
1252 let mut primary = PbftConsensus::new(
1253 "p0".to_string(),
1254 vec!["r1".to_string(), "r2".to_string()],
1255 config,
1256 );
1257
1258 let (_, tp) = transports.into_iter().next().expect("primary transport");
1259 let tp_arc: Arc<dyn Transport> = Arc::new(tp);
1260
1261 let digest = primary.compute_digest(b"cmd");
1263 tp_arc
1264 .send(
1265 "p0",
1266 ConsensusMessage::PbftPrepare {
1267 view: 0,
1268 sequence: 1,
1269 digest: digest.clone(),
1270 node_id: "r1".to_string(),
1271 },
1272 )
1273 .expect("send prepare r1");
1274 tp_arc
1275 .send(
1276 "p0",
1277 ConsensusMessage::PbftPrepare {
1278 view: 0,
1279 sequence: 1,
1280 digest: digest.clone(),
1281 node_id: "r2".to_string(),
1282 },
1283 )
1284 .expect("send prepare r2");
1285
1286 tp_arc
1288 .send(
1289 "p0",
1290 ConsensusMessage::PbftCommit {
1291 view: 0,
1292 sequence: 1,
1293 digest: digest.clone(),
1294 node_id: "r1".to_string(),
1295 },
1296 )
1297 .expect("send commit r1");
1298 tp_arc
1299 .send(
1300 "p0",
1301 ConsensusMessage::PbftCommit {
1302 view: 0,
1303 sequence: 1,
1304 digest,
1305 node_id: "r2".to_string(),
1306 },
1307 )
1308 .expect("send commit r2");
1309
1310 primary.set_transport(Arc::clone(&tp_arc));
1311
1312 let result = primary.start_consensus(b"cmd".to_vec());
1313 assert!(
1314 result.is_ok(),
1315 "PBFT consensus should succeed: {:?}",
1316 result
1317 );
1318 }
1319}