use crate::error::{MetricsError, Result};
use crate::optimization::distributed::transport::{ConsensusMessage, Transport};
use scirs2_core::random::{Rng, RngExt};
use serde::{Deserialize, Serialize};
use std::collections::{HashMap, VecDeque};
use std::net::SocketAddr;
use std::sync::Arc;
use std::time::{Duration, Instant, SystemTime};
pub use super::config::{ConsensusAlgorithm, ConsensusConfig};
pub trait ConsensusManager: Send + Sync {
fn start(&mut self) -> Result<()>;
fn propose(&mut self, data: Vec<u8>) -> Result<String>;
fn get_state(&self) -> ConsensusState;
}
pub struct RaftConsensus {
node_id: String,
current_term: u64,
voted_for: Option<String>,
log: Vec<LogEntry>,
state: NodeState,
peers: HashMap<String, PeerState>,
config: ConsensusConfig,
last_heartbeat: Instant,
election_timeout: Duration,
next_index: HashMap<String, usize>,
match_index: HashMap<String, usize>,
commit_index: u64,
transport: Option<Arc<dyn Transport>>,
}
impl std::fmt::Debug for RaftConsensus {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("RaftConsensus")
.field("node_id", &self.node_id)
.field("current_term", &self.current_term)
.field("state", &self.state)
.finish()
}
}
impl RaftConsensus {
pub fn new(node_id: String, peers: Vec<String>, config: ConsensusConfig) -> Self {
let mut peer_states = HashMap::new();
for peer in peers {
peer_states.insert(
peer.clone(),
PeerState {
id: peer,
last_seen: Instant::now(),
is_healthy: true,
address: None,
},
);
}
Self {
node_id,
current_term: 0,
voted_for: None,
log: vec![],
state: NodeState::Follower,
peers: peer_states,
config,
last_heartbeat: Instant::now(),
election_timeout: Duration::from_millis(5000),
next_index: HashMap::new(),
match_index: HashMap::new(),
commit_index: 0,
transport: None,
}
}
pub fn set_transport(&mut self, t: Arc<dyn Transport>) {
self.transport = Some(t);
}
pub fn poll_messages(&mut self) -> Result<()> {
let transport = match self.transport.clone() {
Some(t) => t,
None => return Ok(()),
};
let max_msgs = 256;
for _ in 0..max_msgs {
match transport.try_recv() {
None => break,
Some((from, msg)) => self.handle_message(from, msg)?,
}
}
Ok(())
}
fn handle_message(&mut self, from: String, msg: ConsensusMessage) -> Result<()> {
match msg {
ConsensusMessage::RequestVote {
term,
candidate_id,
last_log_index: _,
last_log_term: _,
} => {
let granted = self.handle_vote_request(term, candidate_id)?;
if let Some(ref transport) = self.transport.clone() {
let _ = transport.send(
&from,
ConsensusMessage::RequestVoteResponse {
term: self.current_term,
granted,
},
);
}
}
ConsensusMessage::AppendEntries {
term,
leader_id,
prev_log_index,
prev_log_term: _,
entries,
leader_commit,
} => {
let success =
self.handle_append_entries(term, &leader_id, entries, leader_commit)?;
let match_index = prev_log_index + if success { 1 } else { 0 };
if let Some(ref transport) = self.transport.clone() {
let _ = transport.send(
&from,
ConsensusMessage::AppendEntriesResponse {
term: self.current_term,
success,
match_index,
},
);
}
}
ConsensusMessage::RequestVoteResponse { term, granted } => {
if term > self.current_term {
self.current_term = term;
self.state = NodeState::Follower;
self.voted_for = None;
}
if granted && self.state == NodeState::Candidate {
}
}
ConsensusMessage::AppendEntriesResponse {
term,
success,
match_index,
} => {
if term > self.current_term {
self.current_term = term;
self.state = NodeState::Follower;
}
if success && match_index > self.commit_index {
self.commit_index = match_index;
}
}
_ => {} }
Ok(())
}
pub fn handle_append_entries(
&mut self,
term: u64,
leader_id: &str,
entries: Vec<Vec<u8>>,
leader_commit: u64,
) -> Result<bool> {
if term < self.current_term {
return Ok(false);
}
self.current_term = term;
self.state = NodeState::Follower;
self.last_heartbeat = Instant::now();
self.voted_for = Some(leader_id.to_string());
for raw in entries {
let entry = LogEntry {
term,
index: self.log.len() as u64 + 1,
command: Command::UserData(raw),
timestamp: SystemTime::now(),
};
self.log.push(entry);
}
if leader_commit > self.commit_index {
self.commit_index = leader_commit.min(self.log.len() as u64);
}
Ok(true)
}
pub fn start_election(&mut self) -> Result<()> {
self.current_term += 1;
self.state = NodeState::Candidate;
self.voted_for = Some(self.node_id.clone());
self.last_heartbeat = Instant::now();
let base_timeout = self.config.election_timeout_ms;
let jitter = scirs2_core::random::rng().random_range(0..base_timeout / 2);
self.election_timeout = Duration::from_millis(base_timeout + jitter);
let transport = self.transport.clone();
if let Some(ref transport) = transport {
let peers = transport.peer_ids();
let last_log_index = self.log.len() as u64;
let last_log_term = self.log.last().map(|e| e.term).unwrap_or(0);
let msg = ConsensusMessage::RequestVote {
term: self.current_term,
candidate_id: self.node_id.clone(),
last_log_index,
last_log_term,
};
let _ = transport.broadcast(msg);
let mut votes = 1u64; let total = peers.len() as u64 + 1;
for _ in 0..peers.len() {
if let Some((_from, inner_msg)) = transport.try_recv() {
if let ConsensusMessage::RequestVoteResponse { term, granted } = inner_msg {
if term > self.current_term {
self.current_term = term;
self.state = NodeState::Follower;
return Ok(());
}
if granted {
votes += 1;
}
}
}
}
if votes > total / 2 {
self.become_leader()?;
}
}
Ok(())
}
pub fn append_entries(&mut self, entries: Vec<LogEntry>) -> Result<bool> {
for entry in entries {
self.log.push(entry);
}
Ok(true)
}
pub fn handle_vote_request(&mut self, term: u64, candidate_id: String) -> Result<bool> {
if term > self.current_term {
self.current_term = term;
self.voted_for = None;
self.state = NodeState::Follower;
}
let can_vote = self.voted_for.is_none() || self.voted_for.as_ref() == Some(&candidate_id);
if term == self.current_term && can_vote {
self.voted_for = Some(candidate_id);
Ok(true)
} else {
Ok(false)
}
}
pub fn become_leader(&mut self) -> Result<()> {
self.state = NodeState::Leader;
let log_length = self.log.len();
for peer_id in self.peers.keys().cloned().collect::<Vec<_>>() {
self.next_index.insert(peer_id.clone(), log_length);
self.match_index.insert(peer_id, 0);
}
Ok(())
}
pub fn send_heartbeat(&mut self) -> Result<()> {
if self.state != NodeState::Leader {
return Ok(());
}
self.last_heartbeat = Instant::now();
let transport = self.transport.clone();
if let Some(ref transport) = transport {
let msg = ConsensusMessage::AppendEntries {
term: self.current_term,
leader_id: self.node_id.clone(),
prev_log_index: self.commit_index,
prev_log_term: self
.log
.get(self.commit_index as usize)
.map(|e| e.term)
.unwrap_or(0),
entries: vec![],
leader_commit: self.commit_index,
};
let _ = transport.broadcast(msg);
}
Ok(())
}
pub fn is_election_timeout(&self) -> bool {
self.last_heartbeat.elapsed() > self.election_timeout
}
pub fn log_length(&self) -> usize {
self.log.len()
}
pub fn current_term(&self) -> u64 {
self.current_term
}
pub fn current_state(&self) -> &NodeState {
&self.state
}
pub fn commit_index(&self) -> u64 {
self.commit_index
}
}
impl ConsensusManager for RaftConsensus {
fn start(&mut self) -> Result<()> {
self.last_heartbeat = Instant::now();
Ok(())
}
fn propose(&mut self, data: Vec<u8>) -> Result<String> {
if self.state != NodeState::Leader {
return Err(MetricsError::ConsensusError(
"Only leader can propose entries".to_string(),
));
}
let entry = LogEntry {
term: self.current_term,
index: self.log.len() as u64 + 1,
command: Command::UserData(data),
timestamp: SystemTime::now(),
};
let entry_id = format!("entry_{}_{}", self.current_term, entry.index);
self.log.push(entry.clone());
let transport = self.transport.clone();
if let Some(ref transport) = transport {
let entry_bytes = serde_json::to_vec(&entry.command).unwrap_or_default();
let prev_log_index = self.log.len() as u64 - 1;
let prev_log_term = if self.log.len() > 1 {
self.log[self.log.len() - 2].term
} else {
0
};
let msg = ConsensusMessage::AppendEntries {
term: self.current_term,
leader_id: self.node_id.clone(),
prev_log_index,
prev_log_term,
entries: vec![entry_bytes],
leader_commit: self.commit_index,
};
let _ = transport.broadcast(msg);
let peers = transport.peer_ids();
let total = peers.len() as u64 + 1;
let mut acks = 1u64;
for _ in 0..peers.len() {
if let Some((_from, resp)) = transport.try_recv() {
if let ConsensusMessage::AppendEntriesResponse {
success,
match_index,
..
} = resp
{
if success {
acks += 1;
if match_index > self.commit_index {
self.commit_index = match_index;
}
}
}
}
}
if acks > total / 2 {
self.commit_index = self.log.len() as u64;
}
}
Ok(entry_id)
}
fn get_state(&self) -> ConsensusState {
ConsensusState {
term: self.current_term,
leader: if self.state == NodeState::Leader {
Some(self.node_id.clone())
} else {
None
},
node_state: self.state.clone(),
log_length: self.log.len(),
committed_index: self.commit_index as usize,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ConsensusState {
pub term: u64,
pub leader: Option<String>,
pub node_state: NodeState,
pub log_length: usize,
pub committed_index: usize,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub enum NodeState {
Follower,
Candidate,
Leader,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct LogEntry {
pub term: u64,
pub index: u64,
pub command: Command,
pub timestamp: SystemTime,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum Command {
NoOp,
UserData(Vec<u8>),
ConfigChange {
change_type: ConfigChangeType,
node_id: String,
address: Option<SocketAddr>,
},
Snapshot {
last_included_index: u64,
last_included_term: u64,
data: Vec<u8>,
},
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum ConfigChangeType {
AddNode,
RemoveNode,
UpdateNode,
}
#[derive(Debug, Clone)]
pub struct PeerState {
pub id: String,
pub last_seen: Instant,
pub is_healthy: bool,
pub address: Option<SocketAddr>,
}
pub struct PbftConsensus {
node_id: String,
current_view: u64,
sequence_number: u64,
peers: HashMap<String, PeerState>,
config: ConsensusConfig,
message_log: Vec<PbftMessage>,
transport: Option<Arc<dyn Transport>>,
}
impl std::fmt::Debug for PbftConsensus {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("PbftConsensus")
.field("node_id", &self.node_id)
.field("current_view", &self.current_view)
.finish()
}
}
impl PbftConsensus {
pub fn new(node_id: String, peers: Vec<String>, config: ConsensusConfig) -> Self {
let mut peer_states = HashMap::new();
for peer in peers {
peer_states.insert(
peer.clone(),
PeerState {
id: peer,
last_seen: Instant::now(),
is_healthy: true,
address: None,
},
);
}
Self {
node_id,
current_view: 0,
sequence_number: 0,
peers: peer_states,
config,
message_log: vec![],
transport: None,
}
}
pub fn set_transport(&mut self, t: Arc<dyn Transport>) {
self.transport = Some(t);
}
pub fn has_quorum(&self) -> bool {
let total_nodes = self.peers.len() + 1; let healthy_nodes = self.peers.values().filter(|p| p.is_healthy).count() + 1;
healthy_nodes > (total_nodes * 2 / 3)
}
fn f(&self) -> usize {
let n = self.peers.len() + 1;
(n - 1) / 3
}
pub fn start_consensus(&mut self, request: Vec<u8>) -> Result<String> {
if !self.has_quorum() {
return Err(MetricsError::ConsensusError(
"Insufficient nodes for consensus".to_string(),
));
}
self.sequence_number += 1;
let digest = self.compute_digest(&request);
let seq = self.sequence_number;
let view = self.current_view;
let pre_prepare = PbftMessage {
message_type: PbftMessageType::PrePrepare,
view,
sequence: seq,
digest: digest.clone(),
node_id: self.node_id.clone(),
data: request.clone(),
timestamp: SystemTime::now(),
};
self.message_log.push(pre_prepare.clone());
let transport = self.transport.clone();
if let Some(ref transport) = transport {
let _ = transport.broadcast(ConsensusMessage::PbftPrePrepare {
view,
sequence: seq,
digest: digest.clone(),
data: request,
node_id: self.node_id.clone(),
});
let quorum = 2 * self.f() + 1;
let peers_count = self.peers.len();
let mut prepares = 1usize;
for _ in 0..peers_count {
if let Some((_from, msg)) = transport.try_recv() {
if let ConsensusMessage::PbftPrepare {
sequence: s,
digest: d,
..
} = &msg
{
if *s == seq && *d == digest {
prepares += 1;
let prepare_msg = PbftMessage {
message_type: PbftMessageType::Prepare,
view,
sequence: seq,
digest: digest.clone(),
node_id: self.node_id.clone(),
data: vec![],
timestamp: SystemTime::now(),
};
self.message_log.push(prepare_msg);
}
}
}
}
if prepares >= quorum {
let _ = transport.broadcast(ConsensusMessage::PbftCommit {
view,
sequence: seq,
digest: digest.clone(),
node_id: self.node_id.clone(),
});
let mut commits = 1usize;
for _ in 0..peers_count {
if let Some((_from, msg)) = transport.try_recv() {
if let ConsensusMessage::PbftCommit {
sequence: s,
digest: d,
..
} = &msg
{
if *s == seq && *d == digest {
commits += 1;
let commit_msg = PbftMessage {
message_type: PbftMessageType::Commit,
view,
sequence: seq,
digest: digest.clone(),
node_id: self.node_id.clone(),
data: vec![],
timestamp: SystemTime::now(),
};
self.message_log.push(commit_msg);
}
}
}
}
if commits < quorum {
return Err(MetricsError::ConsensusError(
"PBFT commit quorum not reached".to_string(),
));
}
} else {
return Err(MetricsError::ConsensusError(
"PBFT prepare quorum not reached".to_string(),
));
}
}
Ok(format!("pbft_{}_{}", view, seq))
}
fn compute_digest(&self, data: &[u8]) -> String {
use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher};
let mut hasher = DefaultHasher::new();
data.hash(&mut hasher);
format!("{:x}", hasher.finish())
}
}
impl ConsensusManager for PbftConsensus {
fn start(&mut self) -> Result<()> {
self.current_view = 0;
self.sequence_number = 0;
Ok(())
}
fn propose(&mut self, data: Vec<u8>) -> Result<String> {
self.start_consensus(data)
}
fn get_state(&self) -> ConsensusState {
ConsensusState {
term: self.current_view,
leader: Some(format!(
"primary_{}",
self.current_view % (self.peers.len() + 1) as u64
)),
node_state: NodeState::Follower, log_length: self.message_log.len(),
committed_index: 0,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum PbftMessageType {
PrePrepare,
Prepare,
Commit,
ViewChange,
NewView,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PbftMessage {
pub message_type: PbftMessageType,
pub view: u64,
pub sequence: u64,
pub digest: String,
pub node_id: String,
pub data: Vec<u8>,
pub timestamp: SystemTime,
}
pub struct SimpleMajorityConsensus {
node_id: String,
peers: HashMap<String, PeerState>,
votes: VecDeque<Vote>,
config: ConsensusConfig,
transport: Option<Arc<dyn Transport>>,
}
impl std::fmt::Debug for SimpleMajorityConsensus {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("SimpleMajorityConsensus")
.field("node_id", &self.node_id)
.finish()
}
}
impl SimpleMajorityConsensus {
pub fn new(node_id: String, peers: Vec<String>, config: ConsensusConfig) -> Self {
let mut peer_states = HashMap::new();
for peer in peers {
peer_states.insert(
peer.clone(),
PeerState {
id: peer,
last_seen: Instant::now(),
is_healthy: true,
address: None,
},
);
}
Self {
node_id,
peers: peer_states,
votes: VecDeque::new(),
config,
transport: None,
}
}
pub fn set_transport(&mut self, t: Arc<dyn Transport>) {
self.transport = Some(t);
}
pub fn submit_proposal(&mut self, proposal: Vec<u8>) -> Result<String> {
let proposal_id = format!(
"proposal_{}_{}",
SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_millis())
.unwrap_or(0),
scirs2_core::random::rng().random::<u64>()
);
let mut vote = Vote {
proposal_id: proposal_id.clone(),
proposal_data: proposal,
votes_for: 1, votes_against: 0,
voters: vec![self.node_id.clone()],
timestamp: SystemTime::now(),
};
let transport = self.transport.clone();
if let Some(ref transport) = transport {
let _ = transport.broadcast(ConsensusMessage::RequestVote {
term: 0,
candidate_id: proposal_id.clone(),
last_log_index: 0,
last_log_term: 0,
});
let peers_count = self.peers.len();
for _ in 0..peers_count {
if let Some((_from, msg)) = transport.try_recv() {
if let ConsensusMessage::RequestVoteResponse { granted, .. } = msg {
if granted {
vote.votes_for += 1;
} else {
vote.votes_against += 1;
}
}
}
}
}
self.votes.push_back(vote);
while self.votes.len() > 1000 {
self.votes.pop_front();
}
Ok(proposal_id)
}
pub fn has_majority(&self, proposal_id: &str) -> bool {
if let Some(vote) = self.votes.iter().find(|v| v.proposal_id == proposal_id) {
let total_nodes = self.peers.len() + 1; vote.votes_for > total_nodes / 2
} else {
false
}
}
}
impl ConsensusManager for SimpleMajorityConsensus {
fn start(&mut self) -> Result<()> {
self.votes.clear();
Ok(())
}
fn propose(&mut self, data: Vec<u8>) -> Result<String> {
self.submit_proposal(data)
}
fn get_state(&self) -> ConsensusState {
ConsensusState {
term: 0,
leader: Some(self.node_id.clone()),
node_state: NodeState::Leader,
log_length: self.votes.len(),
committed_index: 0,
}
}
}
#[derive(Debug, Clone)]
pub struct Vote {
pub proposal_id: String,
pub proposal_data: Vec<u8>,
pub votes_for: usize,
pub votes_against: usize,
pub voters: Vec<String>,
pub timestamp: SystemTime,
}
pub struct ConsensusFactory;
impl ConsensusFactory {
pub fn create_consensus(
algorithm: ConsensusAlgorithm,
node_id: String,
peers: Vec<String>,
config: ConsensusConfig,
) -> Result<Box<dyn ConsensusManager>> {
match algorithm {
ConsensusAlgorithm::Raft => Ok(Box::new(RaftConsensus::new(node_id, peers, config))),
ConsensusAlgorithm::Pbft => Ok(Box::new(PbftConsensus::new(node_id, peers, config))),
ConsensusAlgorithm::SimpleMajority => Ok(Box::new(SimpleMajorityConsensus::new(
node_id, peers, config,
))),
_ => Err(MetricsError::ConsensusError(format!(
"Consensus algorithm {:?} not implemented",
algorithm
))),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::optimization::distributed::transport::InMemoryTransport;
#[test]
fn test_raft_consensus_creation() {
let config = ConsensusConfig::default();
let peers = vec!["node1".to_string(), "node2".to_string()];
let raft = RaftConsensus::new("node0".to_string(), peers, config);
assert_eq!(raft.current_term(), 0);
assert_eq!(*raft.current_state(), NodeState::Follower);
assert_eq!(raft.log_length(), 0);
}
#[test]
fn test_raft_election_without_transport() {
let config = ConsensusConfig::default();
let peers = vec!["node1".to_string(), "node2".to_string()];
let mut raft = RaftConsensus::new("node0".to_string(), peers, config);
raft.start_election().expect("election should not fail");
assert_eq!(*raft.current_state(), NodeState::Candidate);
assert_eq!(raft.current_term(), 1);
}
#[test]
fn test_pbft_consensus_creation() {
let config = ConsensusConfig::default();
let peers = vec![
"node1".to_string(),
"node2".to_string(),
"node3".to_string(),
];
let pbft = PbftConsensus::new("node0".to_string(), peers, config);
assert!(pbft.has_quorum());
}
#[test]
fn test_simple_majority_consensus() {
let config = ConsensusConfig::default();
let peers = vec!["node1".to_string(), "node2".to_string()];
let mut consensus = SimpleMajorityConsensus::new("node0".to_string(), peers, config);
let proposal_id = consensus
.submit_proposal(b"test proposal".to_vec())
.expect("submit should succeed");
assert!(!consensus.has_majority(&proposal_id));
}
#[test]
fn test_consensus_factory() {
let config = ConsensusConfig::default();
let peers = vec!["node1".to_string()];
let raft = ConsensusFactory::create_consensus(
ConsensusAlgorithm::Raft,
"node0".to_string(),
peers.clone(),
config.clone(),
);
assert!(raft.is_ok());
let pbft = ConsensusFactory::create_consensus(
ConsensusAlgorithm::Pbft,
"node0".to_string(),
peers.clone(),
config.clone(),
);
assert!(pbft.is_ok());
let simple = ConsensusFactory::create_consensus(
ConsensusAlgorithm::SimpleMajority,
"node0".to_string(),
peers,
config,
);
assert!(simple.is_ok());
}
#[test]
fn test_raft_leader_election_in_memory() {
let node_ids = ["n0", "n1", "n2"];
let mut transports = InMemoryTransport::create_network(&node_ids);
let config = ConsensusConfig::default();
let peers_for_n0 = vec!["n1".to_string(), "n2".to_string()];
let mut node0 = RaftConsensus::new("n0".to_string(), peers_for_n0, config.clone());
let (_, t0) = transports.remove(0);
node0.set_transport(Arc::new(t0));
let (_, t1) = transports.remove(0);
let (_, t2) = transports.remove(0);
t1.send(
"n0",
ConsensusMessage::RequestVoteResponse {
term: 1,
granted: true,
},
)
.expect("send should succeed");
t2.send(
"n0",
ConsensusMessage::RequestVoteResponse {
term: 1,
granted: true,
},
)
.expect("send should succeed");
node0.start_election().expect("election should succeed");
assert_eq!(
*node0.current_state(),
NodeState::Leader,
"node 0 should be leader after majority votes"
);
assert_eq!(node0.current_term(), 1);
}
#[test]
fn test_raft_log_replication() {
let node_ids = ["n0", "n1", "n2"];
let mut transports = InMemoryTransport::create_network(&node_ids);
let config = ConsensusConfig::default();
let mut node1 = RaftConsensus::new(
"n1".to_string(),
vec!["n0".to_string(), "n2".to_string()],
config.clone(),
);
let mut node2 = RaftConsensus::new(
"n2".to_string(),
vec!["n0".to_string(), "n1".to_string()],
config.clone(),
);
let (_, t0) = transports.remove(0);
let (_, t1) = transports.remove(0);
let (_, t2) = transports.remove(0);
let t1_arc: Arc<dyn Transport> = Arc::new(t1);
let t2_arc: Arc<dyn Transport> = Arc::new(t2);
node1.set_transport(Arc::clone(&t1_arc));
node2.set_transport(Arc::clone(&t2_arc));
let mut node0 = RaftConsensus::new(
"n0".to_string(),
vec!["n1".to_string(), "n2".to_string()],
config.clone(),
);
node0.set_transport(Arc::new(t0));
node0.become_leader().expect("become_leader should succeed");
t1_arc
.send(
"n0",
ConsensusMessage::AppendEntriesResponse {
term: 0,
success: true,
match_index: 1,
},
)
.expect("send ack n1->n0");
t2_arc
.send(
"n0",
ConsensusMessage::AppendEntriesResponse {
term: 0,
success: true,
match_index: 1,
},
)
.expect("send ack n2->n0");
let entry_id = node0
.propose(b"hello world".to_vec())
.expect("propose should succeed");
assert!(!entry_id.is_empty());
node1.poll_messages().expect("poll n1");
node2.poll_messages().expect("poll n2");
assert_eq!(node1.log_length(), 1, "follower n1 should have 1 log entry");
assert_eq!(node2.log_length(), 1, "follower n2 should have 1 log entry");
}
#[test]
fn test_pbft_consensus_three_nodes() {
let node_ids = ["p0", "r1", "r2"];
let transports = InMemoryTransport::create_network(&node_ids);
let config = ConsensusConfig::default();
let mut primary = PbftConsensus::new(
"p0".to_string(),
vec!["r1".to_string(), "r2".to_string()],
config,
);
let (_, tp) = transports.into_iter().next().expect("primary transport");
let tp_arc: Arc<dyn Transport> = Arc::new(tp);
let digest = primary.compute_digest(b"cmd");
tp_arc
.send(
"p0",
ConsensusMessage::PbftPrepare {
view: 0,
sequence: 1,
digest: digest.clone(),
node_id: "r1".to_string(),
},
)
.expect("send prepare r1");
tp_arc
.send(
"p0",
ConsensusMessage::PbftPrepare {
view: 0,
sequence: 1,
digest: digest.clone(),
node_id: "r2".to_string(),
},
)
.expect("send prepare r2");
tp_arc
.send(
"p0",
ConsensusMessage::PbftCommit {
view: 0,
sequence: 1,
digest: digest.clone(),
node_id: "r1".to_string(),
},
)
.expect("send commit r1");
tp_arc
.send(
"p0",
ConsensusMessage::PbftCommit {
view: 0,
sequence: 1,
digest,
node_id: "r2".to_string(),
},
)
.expect("send commit r2");
primary.set_transport(Arc::clone(&tp_arc));
let result = primary.start_consensus(b"cmd".to_vec());
assert!(
result.is_ok(),
"PBFT consensus should succeed: {:?}",
result
);
}
}