Skip to main content

scirs2_metrics/optimization/distributed/consensus/
mod.rs

1//! Consensus algorithms for distributed coordination
2//!
3//! This module provides implementations of various consensus algorithms:
4//! - Raft consensus algorithm
5//! - Practical Byzantine Fault Tolerance (PBFT)
6//! - Proof of Stake consensus
7//! - Simple majority voting
8
9use crate::error::{MetricsError, Result};
10use crate::optimization::distributed::transport::{ConsensusMessage, Transport};
11use scirs2_core::random::{Rng, RngExt};
12use serde::{Deserialize, Serialize};
13use std::collections::{HashMap, VecDeque};
14use std::net::SocketAddr;
15use std::sync::Arc;
16use std::time::{Duration, Instant, SystemTime};
17
18pub use super::config::{ConsensusAlgorithm, ConsensusConfig};
19
20/// Trait for consensus algorithm implementations
21pub trait ConsensusManager: Send + Sync {
22    /// Start the consensus algorithm
23    fn start(&mut self) -> Result<()>;
24    /// Submit a proposal for consensus
25    fn propose(&mut self, data: Vec<u8>) -> Result<String>;
26    /// Get the current consensus state
27    fn get_state(&self) -> ConsensusState;
28}
29
30/// Raft consensus algorithm implementation
31pub struct RaftConsensus {
32    /// Node ID
33    node_id: String,
34    /// Current term
35    current_term: u64,
36    /// Voted for in current term
37    voted_for: Option<String>,
38    /// Log entries
39    log: Vec<LogEntry>,
40    /// Current state
41    state: NodeState,
42    /// Known peers
43    peers: HashMap<String, PeerState>,
44    /// Configuration
45    config: ConsensusConfig,
46    /// Last heartbeat time
47    last_heartbeat: Instant,
48    /// Election timeout
49    election_timeout: Duration,
50    /// Next index for each peer
51    next_index: HashMap<String, usize>,
52    /// Match index for each peer
53    match_index: HashMap<String, usize>,
54    /// Commit index
55    commit_index: u64,
56    /// Optional transport for inter-node messaging
57    transport: Option<Arc<dyn Transport>>,
58}
59
60impl std::fmt::Debug for RaftConsensus {
61    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
62        f.debug_struct("RaftConsensus")
63            .field("node_id", &self.node_id)
64            .field("current_term", &self.current_term)
65            .field("state", &self.state)
66            .finish()
67    }
68}
69
70impl RaftConsensus {
71    /// Create a new Raft consensus instance
72    pub fn new(node_id: String, peers: Vec<String>, config: ConsensusConfig) -> Self {
73        let mut peer_states = HashMap::new();
74        for peer in peers {
75            peer_states.insert(
76                peer.clone(),
77                PeerState {
78                    id: peer,
79                    last_seen: Instant::now(),
80                    is_healthy: true,
81                    address: None,
82                },
83            );
84        }
85
86        Self {
87            node_id,
88            current_term: 0,
89            voted_for: None,
90            log: vec![],
91            state: NodeState::Follower,
92            peers: peer_states,
93            config,
94            last_heartbeat: Instant::now(),
95            election_timeout: Duration::from_millis(5000),
96            next_index: HashMap::new(),
97            match_index: HashMap::new(),
98            commit_index: 0,
99            transport: None,
100        }
101    }
102
103    /// Attach a transport layer (required for network-connected consensus).
104    pub fn set_transport(&mut self, t: Arc<dyn Transport>) {
105        self.transport = Some(t);
106    }
107
108    /// Drain the receive queue and apply any incoming messages.
109    pub fn poll_messages(&mut self) -> Result<()> {
110        // Clone Arc to avoid borrowing self while mutating it.
111        let transport = match self.transport.clone() {
112            Some(t) => t,
113            None => return Ok(()),
114        };
115
116        // Drain up to a reasonable batch to avoid blocking forever.
117        let max_msgs = 256;
118        for _ in 0..max_msgs {
119            match transport.try_recv() {
120                None => break,
121                Some((from, msg)) => self.handle_message(from, msg)?,
122            }
123        }
124        Ok(())
125    }
126
127    /// Route an incoming message to the appropriate handler.
128    fn handle_message(&mut self, from: String, msg: ConsensusMessage) -> Result<()> {
129        match msg {
130            ConsensusMessage::RequestVote {
131                term,
132                candidate_id,
133                last_log_index: _,
134                last_log_term: _,
135            } => {
136                let granted = self.handle_vote_request(term, candidate_id)?;
137                if let Some(ref transport) = self.transport.clone() {
138                    let _ = transport.send(
139                        &from,
140                        ConsensusMessage::RequestVoteResponse {
141                            term: self.current_term,
142                            granted,
143                        },
144                    );
145                }
146            }
147            ConsensusMessage::AppendEntries {
148                term,
149                leader_id,
150                prev_log_index,
151                prev_log_term: _,
152                entries,
153                leader_commit,
154            } => {
155                let success =
156                    self.handle_append_entries(term, &leader_id, entries, leader_commit)?;
157                let match_index = prev_log_index + if success { 1 } else { 0 };
158                if let Some(ref transport) = self.transport.clone() {
159                    let _ = transport.send(
160                        &from,
161                        ConsensusMessage::AppendEntriesResponse {
162                            term: self.current_term,
163                            success,
164                            match_index,
165                        },
166                    );
167                }
168            }
169            ConsensusMessage::RequestVoteResponse { term, granted } => {
170                if term > self.current_term {
171                    self.current_term = term;
172                    self.state = NodeState::Follower;
173                    self.voted_for = None;
174                }
175                // Vote counting is done in start_election; responses arriving
176                // asynchronously are recorded but leader promotion is deferred.
177                if granted && self.state == NodeState::Candidate {
178                    // We cannot easily count here without state, so we let
179                    // start_election do synchronous counting for the MVP.
180                }
181            }
182            ConsensusMessage::AppendEntriesResponse {
183                term,
184                success,
185                match_index,
186            } => {
187                if term > self.current_term {
188                    self.current_term = term;
189                    self.state = NodeState::Follower;
190                }
191                if success && match_index > self.commit_index {
192                    self.commit_index = match_index;
193                }
194            }
195            _ => {} // PBFT messages are not for Raft nodes
196        }
197        Ok(())
198    }
199
200    /// Handle an AppendEntries RPC from the leader.
201    pub fn handle_append_entries(
202        &mut self,
203        term: u64,
204        leader_id: &str,
205        entries: Vec<Vec<u8>>,
206        leader_commit: u64,
207    ) -> Result<bool> {
208        if term < self.current_term {
209            return Ok(false);
210        }
211        // Recognise the leader.
212        self.current_term = term;
213        self.state = NodeState::Follower;
214        self.last_heartbeat = Instant::now();
215        self.voted_for = Some(leader_id.to_string());
216
217        // Append entries.
218        for raw in entries {
219            let entry = LogEntry {
220                term,
221                index: self.log.len() as u64 + 1,
222                command: Command::UserData(raw),
223                timestamp: SystemTime::now(),
224            };
225            self.log.push(entry);
226        }
227
228        // Update commit index.
229        if leader_commit > self.commit_index {
230            self.commit_index = leader_commit.min(self.log.len() as u64);
231        }
232
233        Ok(true)
234    }
235
236    /// Start an election
237    pub fn start_election(&mut self) -> Result<()> {
238        self.current_term += 1;
239        self.state = NodeState::Candidate;
240        self.voted_for = Some(self.node_id.clone());
241        self.last_heartbeat = Instant::now();
242
243        // Reset election timeout with randomization
244        let base_timeout = self.config.election_timeout_ms;
245        let jitter = scirs2_core::random::rng().random_range(0..base_timeout / 2);
246        self.election_timeout = Duration::from_millis(base_timeout + jitter);
247
248        // Broadcast RequestVote to all peers (if transport is available).
249        let transport = self.transport.clone();
250        if let Some(ref transport) = transport {
251            let peers = transport.peer_ids();
252            let last_log_index = self.log.len() as u64;
253            let last_log_term = self.log.last().map(|e| e.term).unwrap_or(0);
254
255            let msg = ConsensusMessage::RequestVote {
256                term: self.current_term,
257                candidate_id: self.node_id.clone(),
258                last_log_index,
259                last_log_term,
260            };
261            let _ = transport.broadcast(msg);
262
263            // Collect responses with a synchronous short-circuit.
264            let mut votes = 1u64; // vote for self
265            let total = peers.len() as u64 + 1;
266
267            for _ in 0..peers.len() {
268                if let Some((_from, inner_msg)) = transport.try_recv() {
269                    if let ConsensusMessage::RequestVoteResponse { term, granted } = inner_msg {
270                        if term > self.current_term {
271                            self.current_term = term;
272                            self.state = NodeState::Follower;
273                            return Ok(());
274                        }
275                        if granted {
276                            votes += 1;
277                        }
278                    }
279                }
280            }
281
282            if votes > total / 2 {
283                self.become_leader()?;
284            }
285        }
286
287        Ok(())
288    }
289
290    /// Append entries to log
291    pub fn append_entries(&mut self, entries: Vec<LogEntry>) -> Result<bool> {
292        for entry in entries {
293            self.log.push(entry);
294        }
295        Ok(true)
296    }
297
298    /// Handle vote request
299    pub fn handle_vote_request(&mut self, term: u64, candidate_id: String) -> Result<bool> {
300        if term > self.current_term {
301            self.current_term = term;
302            self.voted_for = None;
303            self.state = NodeState::Follower;
304        }
305
306        let can_vote = self.voted_for.is_none() || self.voted_for.as_ref() == Some(&candidate_id);
307
308        if term == self.current_term && can_vote {
309            self.voted_for = Some(candidate_id);
310            Ok(true)
311        } else {
312            Ok(false)
313        }
314    }
315
316    /// Become leader
317    pub fn become_leader(&mut self) -> Result<()> {
318        self.state = NodeState::Leader;
319
320        // Initialize next_index and match_index for all peers
321        let log_length = self.log.len();
322        for peer_id in self.peers.keys().cloned().collect::<Vec<_>>() {
323            self.next_index.insert(peer_id.clone(), log_length);
324            self.match_index.insert(peer_id, 0);
325        }
326
327        Ok(())
328    }
329
330    /// Send heartbeat to peers
331    pub fn send_heartbeat(&mut self) -> Result<()> {
332        if self.state != NodeState::Leader {
333            return Ok(());
334        }
335
336        self.last_heartbeat = Instant::now();
337
338        let transport = self.transport.clone();
339        if let Some(ref transport) = transport {
340            let msg = ConsensusMessage::AppendEntries {
341                term: self.current_term,
342                leader_id: self.node_id.clone(),
343                prev_log_index: self.commit_index,
344                prev_log_term: self
345                    .log
346                    .get(self.commit_index as usize)
347                    .map(|e| e.term)
348                    .unwrap_or(0),
349                entries: vec![],
350                leader_commit: self.commit_index,
351            };
352            let _ = transport.broadcast(msg);
353        }
354
355        Ok(())
356    }
357
358    /// Check if election timeout has occurred
359    pub fn is_election_timeout(&self) -> bool {
360        self.last_heartbeat.elapsed() > self.election_timeout
361    }
362
363    /// Get current log length
364    pub fn log_length(&self) -> usize {
365        self.log.len()
366    }
367
368    /// Get current term
369    pub fn current_term(&self) -> u64 {
370        self.current_term
371    }
372
373    /// Get current state
374    pub fn current_state(&self) -> &NodeState {
375        &self.state
376    }
377
378    /// Get commit index
379    pub fn commit_index(&self) -> u64 {
380        self.commit_index
381    }
382}
383
384impl ConsensusManager for RaftConsensus {
385    fn start(&mut self) -> Result<()> {
386        self.last_heartbeat = Instant::now();
387        Ok(())
388    }
389
390    fn propose(&mut self, data: Vec<u8>) -> Result<String> {
391        if self.state != NodeState::Leader {
392            return Err(MetricsError::ConsensusError(
393                "Only leader can propose entries".to_string(),
394            ));
395        }
396
397        let entry = LogEntry {
398            term: self.current_term,
399            index: self.log.len() as u64 + 1,
400            command: Command::UserData(data),
401            timestamp: SystemTime::now(),
402        };
403
404        let entry_id = format!("entry_{}_{}", self.current_term, entry.index);
405        self.log.push(entry.clone());
406
407        // Replicate to followers via transport (if available).
408        let transport = self.transport.clone();
409        if let Some(ref transport) = transport {
410            let entry_bytes = serde_json::to_vec(&entry.command).unwrap_or_default();
411            let prev_log_index = self.log.len() as u64 - 1;
412            let prev_log_term = if self.log.len() > 1 {
413                self.log[self.log.len() - 2].term
414            } else {
415                0
416            };
417
418            let msg = ConsensusMessage::AppendEntries {
419                term: self.current_term,
420                leader_id: self.node_id.clone(),
421                prev_log_index,
422                prev_log_term,
423                entries: vec![entry_bytes],
424                leader_commit: self.commit_index,
425            };
426            let _ = transport.broadcast(msg);
427
428            // Collect acknowledgements — simple synchronous majority wait.
429            let peers = transport.peer_ids();
430            let total = peers.len() as u64 + 1;
431            let mut acks = 1u64; // count self
432
433            for _ in 0..peers.len() {
434                if let Some((_from, resp)) = transport.try_recv() {
435                    if let ConsensusMessage::AppendEntriesResponse {
436                        success,
437                        match_index,
438                        ..
439                    } = resp
440                    {
441                        if success {
442                            acks += 1;
443                            if match_index > self.commit_index {
444                                self.commit_index = match_index;
445                            }
446                        }
447                    }
448                }
449            }
450
451            if acks > total / 2 {
452                self.commit_index = self.log.len() as u64;
453            }
454        }
455
456        Ok(entry_id)
457    }
458
459    fn get_state(&self) -> ConsensusState {
460        ConsensusState {
461            term: self.current_term,
462            leader: if self.state == NodeState::Leader {
463                Some(self.node_id.clone())
464            } else {
465                None
466            },
467            node_state: self.state.clone(),
468            log_length: self.log.len(),
469            committed_index: self.commit_index as usize,
470        }
471    }
472}
473
474/// Current consensus state
475#[derive(Debug, Clone, Serialize, Deserialize)]
476pub struct ConsensusState {
477    /// Current term
478    pub term: u64,
479    /// Current leader (if known)
480    pub leader: Option<String>,
481    /// Node state
482    pub node_state: NodeState,
483    /// Log length
484    pub log_length: usize,
485    /// Last committed index
486    pub committed_index: usize,
487}
488
489/// Node states in Raft
490#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
491pub enum NodeState {
492    /// Follower state
493    Follower,
494    /// Candidate state (during election)
495    Candidate,
496    /// Leader state
497    Leader,
498}
499
500/// Log entry in Raft
501#[derive(Debug, Clone, Serialize, Deserialize)]
502pub struct LogEntry {
503    /// Term when entry was created
504    pub term: u64,
505    /// Index in the log
506    pub index: u64,
507    /// Command to apply
508    pub command: Command,
509    /// Timestamp when entry was created
510    pub timestamp: SystemTime,
511}
512
513/// Commands that can be stored in the log
514#[derive(Debug, Clone, Serialize, Deserialize)]
515pub enum Command {
516    /// No-op command (used for heartbeats)
517    NoOp,
518    /// User data
519    UserData(Vec<u8>),
520    /// Configuration change
521    ConfigChange {
522        /// Type of change
523        change_type: ConfigChangeType,
524        /// Node ID
525        node_id: String,
526        /// Node address
527        address: Option<SocketAddr>,
528    },
529    /// Snapshot command
530    Snapshot {
531        /// Last included index
532        last_included_index: u64,
533        /// Last included term
534        last_included_term: u64,
535        /// Snapshot data
536        data: Vec<u8>,
537    },
538}
539
540/// Configuration change types
541#[derive(Debug, Clone, Serialize, Deserialize)]
542pub enum ConfigChangeType {
543    /// Add a new node
544    AddNode,
545    /// Remove an existing node
546    RemoveNode,
547    /// Update node address
548    UpdateNode,
549}
550
551/// Peer state information
552#[derive(Debug, Clone)]
553pub struct PeerState {
554    /// Peer ID
555    pub id: String,
556    /// Last time we heard from this peer
557    pub last_seen: Instant,
558    /// Whether the peer is considered healthy
559    pub is_healthy: bool,
560    /// Peer network address
561    pub address: Option<SocketAddr>,
562}
563
564// ─────────────────────────────────────────────────────────────────────────────
565// PBFT Consensus
566// ─────────────────────────────────────────────────────────────────────────────
567
568/// PBFT consensus implementation
569pub struct PbftConsensus {
570    /// Node ID
571    node_id: String,
572    /// Current view
573    current_view: u64,
574    /// Current sequence number
575    sequence_number: u64,
576    /// Known peers
577    peers: HashMap<String, PeerState>,
578    /// Configuration
579    config: ConsensusConfig,
580    /// Message log for three-phase protocol
581    message_log: Vec<PbftMessage>,
582    /// Optional transport layer
583    transport: Option<Arc<dyn Transport>>,
584}
585
586impl std::fmt::Debug for PbftConsensus {
587    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
588        f.debug_struct("PbftConsensus")
589            .field("node_id", &self.node_id)
590            .field("current_view", &self.current_view)
591            .finish()
592    }
593}
594
595impl PbftConsensus {
596    /// Create a new PBFT consensus instance
597    pub fn new(node_id: String, peers: Vec<String>, config: ConsensusConfig) -> Self {
598        let mut peer_states = HashMap::new();
599        for peer in peers {
600            peer_states.insert(
601                peer.clone(),
602                PeerState {
603                    id: peer,
604                    last_seen: Instant::now(),
605                    is_healthy: true,
606                    address: None,
607                },
608            );
609        }
610
611        Self {
612            node_id,
613            current_view: 0,
614            sequence_number: 0,
615            peers: peer_states,
616            config,
617            message_log: vec![],
618            transport: None,
619        }
620    }
621
622    /// Attach a transport layer.
623    pub fn set_transport(&mut self, t: Arc<dyn Transport>) {
624        self.transport = Some(t);
625    }
626
627    /// Check if we have enough replicas for consensus
628    pub fn has_quorum(&self) -> bool {
629        let total_nodes = self.peers.len() + 1; // +1 for self
630        let healthy_nodes = self.peers.values().filter(|p| p.is_healthy).count() + 1;
631
632        // PBFT requires 3f + 1 nodes to tolerate f Byzantine failures.
633        // We use 2f + 1 for non-Byzantine consensus.
634        healthy_nodes > (total_nodes * 2 / 3)
635    }
636
637    /// Number of faulty nodes we can tolerate.
638    fn f(&self) -> usize {
639        // 3f + 1 <= n  =>  f <= (n-1)/3
640        let n = self.peers.len() + 1;
641        (n - 1) / 3
642    }
643
644    /// Start PBFT three-phase protocol
645    pub fn start_consensus(&mut self, request: Vec<u8>) -> Result<String> {
646        if !self.has_quorum() {
647            return Err(MetricsError::ConsensusError(
648                "Insufficient nodes for consensus".to_string(),
649            ));
650        }
651
652        self.sequence_number += 1;
653        let digest = self.compute_digest(&request);
654        let seq = self.sequence_number;
655        let view = self.current_view;
656
657        // Phase 1: Pre-prepare — broadcast to all replicas.
658        let pre_prepare = PbftMessage {
659            message_type: PbftMessageType::PrePrepare,
660            view,
661            sequence: seq,
662            digest: digest.clone(),
663            node_id: self.node_id.clone(),
664            data: request.clone(),
665            timestamp: SystemTime::now(),
666        };
667        self.message_log.push(pre_prepare.clone());
668
669        let transport = self.transport.clone();
670        if let Some(ref transport) = transport {
671            let _ = transport.broadcast(ConsensusMessage::PbftPrePrepare {
672                view,
673                sequence: seq,
674                digest: digest.clone(),
675                data: request,
676                node_id: self.node_id.clone(),
677            });
678
679            // Phase 2: Prepare — collect 2f+1 prepare messages.
680            let quorum = 2 * self.f() + 1;
681            let peers_count = self.peers.len();
682            let mut prepares = 1usize; // count self
683
684            for _ in 0..peers_count {
685                if let Some((_from, msg)) = transport.try_recv() {
686                    if let ConsensusMessage::PbftPrepare {
687                        sequence: s,
688                        digest: d,
689                        ..
690                    } = &msg
691                    {
692                        if *s == seq && *d == digest {
693                            prepares += 1;
694                            let prepare_msg = PbftMessage {
695                                message_type: PbftMessageType::Prepare,
696                                view,
697                                sequence: seq,
698                                digest: digest.clone(),
699                                node_id: self.node_id.clone(),
700                                data: vec![],
701                                timestamp: SystemTime::now(),
702                            };
703                            self.message_log.push(prepare_msg);
704                        }
705                    }
706                }
707            }
708
709            // Phase 3: Commit — if enough prepares, broadcast commit.
710            if prepares >= quorum {
711                let _ = transport.broadcast(ConsensusMessage::PbftCommit {
712                    view,
713                    sequence: seq,
714                    digest: digest.clone(),
715                    node_id: self.node_id.clone(),
716                });
717
718                // Collect 2f+1 commits.
719                let mut commits = 1usize;
720                for _ in 0..peers_count {
721                    if let Some((_from, msg)) = transport.try_recv() {
722                        if let ConsensusMessage::PbftCommit {
723                            sequence: s,
724                            digest: d,
725                            ..
726                        } = &msg
727                        {
728                            if *s == seq && *d == digest {
729                                commits += 1;
730                                let commit_msg = PbftMessage {
731                                    message_type: PbftMessageType::Commit,
732                                    view,
733                                    sequence: seq,
734                                    digest: digest.clone(),
735                                    node_id: self.node_id.clone(),
736                                    data: vec![],
737                                    timestamp: SystemTime::now(),
738                                };
739                                self.message_log.push(commit_msg);
740                            }
741                        }
742                    }
743                }
744
745                if commits < quorum {
746                    return Err(MetricsError::ConsensusError(
747                        "PBFT commit quorum not reached".to_string(),
748                    ));
749                }
750            } else {
751                return Err(MetricsError::ConsensusError(
752                    "PBFT prepare quorum not reached".to_string(),
753                ));
754            }
755        }
756
757        Ok(format!("pbft_{}_{}", view, seq))
758    }
759
760    fn compute_digest(&self, data: &[u8]) -> String {
761        // Simplified hash computation using DefaultHasher.
762        use std::collections::hash_map::DefaultHasher;
763        use std::hash::{Hash, Hasher};
764
765        let mut hasher = DefaultHasher::new();
766        data.hash(&mut hasher);
767        format!("{:x}", hasher.finish())
768    }
769}
770
771impl ConsensusManager for PbftConsensus {
772    fn start(&mut self) -> Result<()> {
773        self.current_view = 0;
774        self.sequence_number = 0;
775        Ok(())
776    }
777
778    fn propose(&mut self, data: Vec<u8>) -> Result<String> {
779        self.start_consensus(data)
780    }
781
782    fn get_state(&self) -> ConsensusState {
783        ConsensusState {
784            term: self.current_view,
785            leader: Some(format!(
786                "primary_{}",
787                self.current_view % (self.peers.len() + 1) as u64
788            )),
789            node_state: NodeState::Follower, // PBFT is not leader-based
790            log_length: self.message_log.len(),
791            committed_index: 0,
792        }
793    }
794}
795
796/// PBFT message types
797#[derive(Debug, Clone, Serialize, Deserialize)]
798pub enum PbftMessageType {
799    /// Pre-prepare phase
800    PrePrepare,
801    /// Prepare phase
802    Prepare,
803    /// Commit phase
804    Commit,
805    /// View change
806    ViewChange,
807    /// New view
808    NewView,
809}
810
811/// PBFT protocol message
812#[derive(Debug, Clone, Serialize, Deserialize)]
813pub struct PbftMessage {
814    /// Message type
815    pub message_type: PbftMessageType,
816    /// Current view number
817    pub view: u64,
818    /// Sequence number
819    pub sequence: u64,
820    /// Message digest
821    pub digest: String,
822    /// Sender node ID
823    pub node_id: String,
824    /// Message data
825    pub data: Vec<u8>,
826    /// Timestamp
827    pub timestamp: SystemTime,
828}
829
830// ─────────────────────────────────────────────────────────────────────────────
831// SimpleMajorityConsensus
832// ─────────────────────────────────────────────────────────────────────────────
833
834/// Simple majority consensus (for testing/fallback)
835pub struct SimpleMajorityConsensus {
836    /// Node ID
837    node_id: String,
838    /// Known peers
839    peers: HashMap<String, PeerState>,
840    /// Vote history
841    votes: VecDeque<Vote>,
842    /// Configuration
843    config: ConsensusConfig,
844    /// Optional transport
845    transport: Option<Arc<dyn Transport>>,
846}
847
848impl std::fmt::Debug for SimpleMajorityConsensus {
849    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
850        f.debug_struct("SimpleMajorityConsensus")
851            .field("node_id", &self.node_id)
852            .finish()
853    }
854}
855
856impl SimpleMajorityConsensus {
857    /// Create a new simple majority consensus instance
858    pub fn new(node_id: String, peers: Vec<String>, config: ConsensusConfig) -> Self {
859        let mut peer_states = HashMap::new();
860        for peer in peers {
861            peer_states.insert(
862                peer.clone(),
863                PeerState {
864                    id: peer,
865                    last_seen: Instant::now(),
866                    is_healthy: true,
867                    address: None,
868                },
869            );
870        }
871
872        Self {
873            node_id,
874            peers: peer_states,
875            votes: VecDeque::new(),
876            config,
877            transport: None,
878        }
879    }
880
881    /// Attach a transport layer.
882    pub fn set_transport(&mut self, t: Arc<dyn Transport>) {
883        self.transport = Some(t);
884    }
885
886    /// Submit a proposal for voting
887    pub fn submit_proposal(&mut self, proposal: Vec<u8>) -> Result<String> {
888        let proposal_id = format!(
889            "proposal_{}_{}",
890            SystemTime::now()
891                .duration_since(std::time::UNIX_EPOCH)
892                .map(|d| d.as_millis())
893                .unwrap_or(0),
894            scirs2_core::random::rng().random::<u64>()
895        );
896
897        let mut vote = Vote {
898            proposal_id: proposal_id.clone(),
899            proposal_data: proposal,
900            votes_for: 1, // Self vote
901            votes_against: 0,
902            voters: vec![self.node_id.clone()],
903            timestamp: SystemTime::now(),
904        };
905
906        // Broadcast vote request to peers (if transport available).
907        let transport = self.transport.clone();
908        if let Some(ref transport) = transport {
909            let _ = transport.broadcast(ConsensusMessage::RequestVote {
910                term: 0,
911                candidate_id: proposal_id.clone(),
912                last_log_index: 0,
913                last_log_term: 0,
914            });
915
916            // Collect peer votes.
917            let peers_count = self.peers.len();
918            for _ in 0..peers_count {
919                if let Some((_from, msg)) = transport.try_recv() {
920                    if let ConsensusMessage::RequestVoteResponse { granted, .. } = msg {
921                        if granted {
922                            vote.votes_for += 1;
923                        } else {
924                            vote.votes_against += 1;
925                        }
926                    }
927                }
928            }
929        }
930
931        self.votes.push_back(vote);
932
933        // Cleanup old votes.
934        while self.votes.len() > 1000 {
935            self.votes.pop_front();
936        }
937
938        Ok(proposal_id)
939    }
940
941    /// Check if proposal has majority
942    pub fn has_majority(&self, proposal_id: &str) -> bool {
943        if let Some(vote) = self.votes.iter().find(|v| v.proposal_id == proposal_id) {
944            let total_nodes = self.peers.len() + 1; // +1 for self
945            vote.votes_for > total_nodes / 2
946        } else {
947            false
948        }
949    }
950}
951
952impl ConsensusManager for SimpleMajorityConsensus {
953    fn start(&mut self) -> Result<()> {
954        self.votes.clear();
955        Ok(())
956    }
957
958    fn propose(&mut self, data: Vec<u8>) -> Result<String> {
959        self.submit_proposal(data)
960    }
961
962    fn get_state(&self) -> ConsensusState {
963        ConsensusState {
964            term: 0,
965            leader: Some(self.node_id.clone()),
966            node_state: NodeState::Leader,
967            log_length: self.votes.len(),
968            committed_index: 0,
969        }
970    }
971}
972
973/// Vote for simple majority consensus
974#[derive(Debug, Clone)]
975pub struct Vote {
976    /// Proposal ID
977    pub proposal_id: String,
978    /// Proposal data
979    pub proposal_data: Vec<u8>,
980    /// Number of votes for
981    pub votes_for: usize,
982    /// Number of votes against
983    pub votes_against: usize,
984    /// List of voters
985    pub voters: Vec<String>,
986    /// Vote timestamp
987    pub timestamp: SystemTime,
988}
989
990// ─────────────────────────────────────────────────────────────────────────────
991// Factory
992// ─────────────────────────────────────────────────────────────────────────────
993
994/// Factory for creating consensus instances
995pub struct ConsensusFactory;
996
997impl ConsensusFactory {
998    /// Create a consensus manager based on configuration
999    pub fn create_consensus(
1000        algorithm: ConsensusAlgorithm,
1001        node_id: String,
1002        peers: Vec<String>,
1003        config: ConsensusConfig,
1004    ) -> Result<Box<dyn ConsensusManager>> {
1005        match algorithm {
1006            ConsensusAlgorithm::Raft => Ok(Box::new(RaftConsensus::new(node_id, peers, config))),
1007            ConsensusAlgorithm::Pbft => Ok(Box::new(PbftConsensus::new(node_id, peers, config))),
1008            ConsensusAlgorithm::SimpleMajority => Ok(Box::new(SimpleMajorityConsensus::new(
1009                node_id, peers, config,
1010            ))),
1011            _ => Err(MetricsError::ConsensusError(format!(
1012                "Consensus algorithm {:?} not implemented",
1013                algorithm
1014            ))),
1015        }
1016    }
1017}
1018
1019// ─────────────────────────────────────────────────────────────────────────────
1020// Tests
1021// ─────────────────────────────────────────────────────────────────────────────
1022
1023#[cfg(test)]
1024mod tests {
1025    use super::*;
1026    use crate::optimization::distributed::transport::InMemoryTransport;
1027
1028    // ── Basic creation tests ──────────────────────────────────────────────────
1029
1030    #[test]
1031    fn test_raft_consensus_creation() {
1032        let config = ConsensusConfig::default();
1033        let peers = vec!["node1".to_string(), "node2".to_string()];
1034        let raft = RaftConsensus::new("node0".to_string(), peers, config);
1035
1036        assert_eq!(raft.current_term(), 0);
1037        assert_eq!(*raft.current_state(), NodeState::Follower);
1038        assert_eq!(raft.log_length(), 0);
1039    }
1040
1041    #[test]
1042    fn test_raft_election_without_transport() {
1043        let config = ConsensusConfig::default();
1044        let peers = vec!["node1".to_string(), "node2".to_string()];
1045        let mut raft = RaftConsensus::new("node0".to_string(), peers, config);
1046
1047        raft.start_election().expect("election should not fail");
1048        assert_eq!(*raft.current_state(), NodeState::Candidate);
1049        assert_eq!(raft.current_term(), 1);
1050    }
1051
1052    #[test]
1053    fn test_pbft_consensus_creation() {
1054        let config = ConsensusConfig::default();
1055        let peers = vec![
1056            "node1".to_string(),
1057            "node2".to_string(),
1058            "node3".to_string(),
1059        ];
1060        let pbft = PbftConsensus::new("node0".to_string(), peers, config);
1061
1062        assert!(pbft.has_quorum());
1063    }
1064
1065    #[test]
1066    fn test_simple_majority_consensus() {
1067        let config = ConsensusConfig::default();
1068        let peers = vec!["node1".to_string(), "node2".to_string()];
1069        let mut consensus = SimpleMajorityConsensus::new("node0".to_string(), peers, config);
1070
1071        let proposal_id = consensus
1072            .submit_proposal(b"test proposal".to_vec())
1073            .expect("submit should succeed");
1074        // Only self-vote, no transport => no majority
1075        assert!(!consensus.has_majority(&proposal_id));
1076    }
1077
1078    #[test]
1079    fn test_consensus_factory() {
1080        let config = ConsensusConfig::default();
1081        let peers = vec!["node1".to_string()];
1082
1083        let raft = ConsensusFactory::create_consensus(
1084            ConsensusAlgorithm::Raft,
1085            "node0".to_string(),
1086            peers.clone(),
1087            config.clone(),
1088        );
1089        assert!(raft.is_ok());
1090
1091        let pbft = ConsensusFactory::create_consensus(
1092            ConsensusAlgorithm::Pbft,
1093            "node0".to_string(),
1094            peers.clone(),
1095            config.clone(),
1096        );
1097        assert!(pbft.is_ok());
1098
1099        let simple = ConsensusFactory::create_consensus(
1100            ConsensusAlgorithm::SimpleMajority,
1101            "node0".to_string(),
1102            peers,
1103            config,
1104        );
1105        assert!(simple.is_ok());
1106    }
1107
1108    // ── In-memory transport tests ─────────────────────────────────────────────
1109
1110    /// Create a 3-node Raft network, simulate election on node 0,
1111    /// verify it becomes leader (nodes 1 and 2 respond with RequestVoteResponse).
1112    #[test]
1113    fn test_raft_leader_election_in_memory() {
1114        let node_ids = ["n0", "n1", "n2"];
1115        let mut transports = InMemoryTransport::create_network(&node_ids);
1116
1117        // Build Raft nodes wired to the transport.
1118        let config = ConsensusConfig::default();
1119        let peers_for_n0 = vec!["n1".to_string(), "n2".to_string()];
1120        let mut node0 = RaftConsensus::new("n0".to_string(), peers_for_n0, config.clone());
1121
1122        // Extract and attach transport for node 0.
1123        let (_, t0) = transports.remove(0);
1124        node0.set_transport(Arc::new(t0));
1125
1126        // Simulate followers answering vote requests.
1127        // Before calling start_election we need the follower transports to
1128        // reply with granted=true.  Because start_election sends then drains
1129        // synchronously we must pre-populate the n0 inbox with two votes.
1130        let (_, t1) = transports.remove(0);
1131        let (_, t2) = transports.remove(0);
1132
1133        // Both followers pre-send a granted response directly into n0's channel.
1134        t1.send(
1135            "n0",
1136            ConsensusMessage::RequestVoteResponse {
1137                term: 1,
1138                granted: true,
1139            },
1140        )
1141        .expect("send should succeed");
1142        t2.send(
1143            "n0",
1144            ConsensusMessage::RequestVoteResponse {
1145                term: 1,
1146                granted: true,
1147            },
1148        )
1149        .expect("send should succeed");
1150
1151        // Now run the election; it will drain the two pre-populated responses.
1152        node0.start_election().expect("election should succeed");
1153
1154        // Node 0 should have promoted itself to leader.
1155        assert_eq!(
1156            *node0.current_state(),
1157            NodeState::Leader,
1158            "node 0 should be leader after majority votes"
1159        );
1160        assert_eq!(node0.current_term(), 1);
1161    }
1162
1163    /// Leader proposes an entry and followers receive it via poll_messages.
1164    #[test]
1165    fn test_raft_log_replication() {
1166        let node_ids = ["n0", "n1", "n2"];
1167        let mut transports = InMemoryTransport::create_network(&node_ids);
1168
1169        let config = ConsensusConfig::default();
1170
1171        // Build follower nodes.
1172        let mut node1 = RaftConsensus::new(
1173            "n1".to_string(),
1174            vec!["n0".to_string(), "n2".to_string()],
1175            config.clone(),
1176        );
1177        let mut node2 = RaftConsensus::new(
1178            "n2".to_string(),
1179            vec!["n0".to_string(), "n1".to_string()],
1180            config.clone(),
1181        );
1182
1183        let (_, t0) = transports.remove(0);
1184        let (_, t1) = transports.remove(0);
1185        let (_, t2) = transports.remove(0);
1186
1187        // Attach transports.
1188        let t1_arc: Arc<dyn Transport> = Arc::new(t1);
1189        let t2_arc: Arc<dyn Transport> = Arc::new(t2);
1190        node1.set_transport(Arc::clone(&t1_arc));
1191        node2.set_transport(Arc::clone(&t2_arc));
1192
1193        // Build leader separately (its transport must point at the same network).
1194        let mut node0 = RaftConsensus::new(
1195            "n0".to_string(),
1196            vec!["n1".to_string(), "n2".to_string()],
1197            config.clone(),
1198        );
1199        node0.set_transport(Arc::new(t0));
1200
1201        // Force node 0 into leader state.
1202        node0.become_leader().expect("become_leader should succeed");
1203
1204        // Pre-populate acknowledgements from followers so propose() can collect them.
1205        t1_arc
1206            .send(
1207                "n0",
1208                ConsensusMessage::AppendEntriesResponse {
1209                    term: 0,
1210                    success: true,
1211                    match_index: 1,
1212                },
1213            )
1214            .expect("send ack n1->n0");
1215        t2_arc
1216            .send(
1217                "n0",
1218                ConsensusMessage::AppendEntriesResponse {
1219                    term: 0,
1220                    success: true,
1221                    match_index: 1,
1222                },
1223            )
1224            .expect("send ack n2->n0");
1225
1226        // Propose; the AppendEntries broadcast will reach n1 and n2.
1227        let entry_id = node0
1228            .propose(b"hello world".to_vec())
1229            .expect("propose should succeed");
1230        assert!(!entry_id.is_empty());
1231
1232        // Followers process their inbox.
1233        node1.poll_messages().expect("poll n1");
1234        node2.poll_messages().expect("poll n2");
1235
1236        // Both followers should have the entry in their log.
1237        assert_eq!(node1.log_length(), 1, "follower n1 should have 1 log entry");
1238        assert_eq!(node2.log_length(), 1, "follower n2 should have 1 log entry");
1239    }
1240
1241    /// 3-replica PBFT cluster: propose a command, verify commit succeeds.
1242    ///
1243    /// In this MVP the primary is the only node wired to transport; it simulates
1244    /// the three-phase protocol synchronously.  Replicas pre-populate Prepare
1245    /// and Commit responses so the primary can collect quorum.
1246    #[test]
1247    fn test_pbft_consensus_three_nodes() {
1248        let node_ids = ["p0", "r1", "r2"];
1249        let transports = InMemoryTransport::create_network(&node_ids);
1250
1251        let config = ConsensusConfig::default();
1252        let mut primary = PbftConsensus::new(
1253            "p0".to_string(),
1254            vec!["r1".to_string(), "r2".to_string()],
1255            config,
1256        );
1257
1258        let (_, tp) = transports.into_iter().next().expect("primary transport");
1259        let tp_arc: Arc<dyn Transport> = Arc::new(tp);
1260
1261        // Pre-populate Prepare messages from r1 and r2.
1262        let digest = primary.compute_digest(b"cmd");
1263        tp_arc
1264            .send(
1265                "p0",
1266                ConsensusMessage::PbftPrepare {
1267                    view: 0,
1268                    sequence: 1,
1269                    digest: digest.clone(),
1270                    node_id: "r1".to_string(),
1271                },
1272            )
1273            .expect("send prepare r1");
1274        tp_arc
1275            .send(
1276                "p0",
1277                ConsensusMessage::PbftPrepare {
1278                    view: 0,
1279                    sequence: 1,
1280                    digest: digest.clone(),
1281                    node_id: "r2".to_string(),
1282                },
1283            )
1284            .expect("send prepare r2");
1285
1286        // Pre-populate Commit messages from r1 and r2.
1287        tp_arc
1288            .send(
1289                "p0",
1290                ConsensusMessage::PbftCommit {
1291                    view: 0,
1292                    sequence: 1,
1293                    digest: digest.clone(),
1294                    node_id: "r1".to_string(),
1295                },
1296            )
1297            .expect("send commit r1");
1298        tp_arc
1299            .send(
1300                "p0",
1301                ConsensusMessage::PbftCommit {
1302                    view: 0,
1303                    sequence: 1,
1304                    digest,
1305                    node_id: "r2".to_string(),
1306                },
1307            )
1308            .expect("send commit r2");
1309
1310        primary.set_transport(Arc::clone(&tp_arc));
1311
1312        let result = primary.start_consensus(b"cmd".to_vec());
1313        assert!(
1314            result.is_ok(),
1315            "PBFT consensus should succeed: {:?}",
1316            result
1317        );
1318    }
1319}