exo_federation/
consensus.rs

1//! Byzantine fault-tolerant consensus
2//!
3//! Implements PBFT-style consensus for state updates across federation:
4//! - Pre-prepare phase
5//! - Prepare phase
6//! - Commit phase
7//! - Proof generation
8
9use serde::{Deserialize, Serialize};
10use std::collections::HashMap;
11use crate::{Result, FederationError, PeerId, StateUpdate};
12
13/// Consensus message types
14#[derive(Debug, Clone, Serialize, Deserialize)]
15pub enum ConsensusMessage {
16    PrePrepare { proposal: SignedProposal },
17    Prepare { digest: Vec<u8>, sender: PeerId },
18    Commit { digest: Vec<u8>, sender: PeerId },
19}
20
21/// Signed proposal for a state update
22#[derive(Debug, Clone, Serialize, Deserialize)]
23pub struct SignedProposal {
24    pub update: StateUpdate,
25    pub sequence_number: u64,
26    pub signature: Vec<u8>,
27}
28
29/// Proof that consensus was reached
30#[derive(Debug, Clone, Serialize, Deserialize)]
31pub struct CommitProof {
32    pub update_id: String,
33    pub commit_messages: Vec<CommitMessage>,
34    pub timestamp: u64,
35}
36
37impl CommitProof {
38    /// Verify that proof contains sufficient commits
39    pub fn verify(&self, total_nodes: usize) -> bool {
40        let threshold = byzantine_threshold(total_nodes);
41        self.commit_messages.len() >= threshold
42    }
43}
44
45/// A commit message from a peer
46#[derive(Debug, Clone, Serialize, Deserialize)]
47pub struct CommitMessage {
48    pub peer_id: PeerId,
49    pub digest: Vec<u8>,
50    pub signature: Vec<u8>,
51}
52
53/// Result of a consensus attempt
54#[derive(Debug)]
55pub enum CommitResult {
56    Success(CommitProof),
57    InsufficientPrepares,
58    InsufficientCommits,
59}
60
61/// Calculate Byzantine fault threshold
62///
63/// For n = 3f + 1 nodes, we can tolerate f Byzantine faults.
64/// Consensus requires 2f + 1 = (2n + 2) / 3 agreements.
65fn byzantine_threshold(n: usize) -> usize {
66    (2 * n + 2) / 3
67}
68
69/// Execute Byzantine fault-tolerant consensus on a state update
70///
71/// # PBFT Protocol
72///
73/// 1. **Pre-prepare**: Leader proposes update
74/// 2. **Prepare**: Nodes acknowledge receipt (2f+1 required)
75/// 3. **Commit**: Nodes commit to proposal (2f+1 required)
76/// 4. **Execute**: Update is applied with proof
77///
78/// # Implementation from PSEUDOCODE.md
79///
80/// ```pseudocode
81/// FUNCTION ByzantineCommit(update, federation):
82///     n = federation.node_count()
83///     f = (n - 1) / 3
84///     threshold = 2*f + 1
85///
86///     // Phase 1: Pre-prepare
87///     IF federation.is_leader():
88///         proposal = SignedProposal(update, sequence_number=NEXT_SEQ)
89///         Broadcast(federation.nodes, PrePrepare(proposal))
90///
91///     // Phase 2: Prepare
92///     pre_prepare = ReceivePrePrepare()
93///     IF ValidateProposal(pre_prepare):
94///         prepare_msg = Prepare(pre_prepare.digest, local_id)
95///         Broadcast(federation.nodes, prepare_msg)
96///
97///     prepares = CollectMessages(type=Prepare, count=threshold)
98///     IF len(prepares) < threshold:
99///         RETURN InsufficientPrepares
100///
101///     // Phase 3: Commit
102///     commit_msg = Commit(pre_prepare.digest, local_id)
103///     Broadcast(federation.nodes, commit_msg)
104///
105///     commits = CollectMessages(type=Commit, count=threshold)
106///     IF len(commits) >= threshold:
107///         federation.apply_update(update)
108///         proof = CommitProof(commits)
109///         RETURN Success(proof)
110///     ELSE:
111///         RETURN InsufficientCommits
112/// ```
113pub async fn byzantine_commit(
114    update: StateUpdate,
115    peer_count: usize,
116) -> Result<CommitProof> {
117    let n = peer_count;
118    let f = if n > 0 { (n - 1) / 3 } else { 0 };
119    let threshold = 2 * f + 1;
120
121    if n < 4 {
122        return Err(FederationError::InsufficientPeers {
123            needed: 4,
124            actual: n,
125        });
126    }
127
128    // Phase 1: Pre-prepare (leader proposes)
129    let sequence_number = get_next_sequence_number();
130    let proposal = SignedProposal {
131        update: update.clone(),
132        sequence_number,
133        signature: sign_proposal(&update),
134    };
135
136    // Broadcast pre-prepare (simulated)
137    let pre_prepare = ConsensusMessage::PrePrepare {
138        proposal: proposal.clone(),
139    };
140
141    // Phase 2: Prepare (nodes acknowledge)
142    let digest = compute_digest(&update);
143
144    // Simulate collecting prepare messages from peers
145    let prepares = simulate_prepare_phase(&digest, threshold)?;
146
147    if prepares.len() < threshold {
148        return Err(FederationError::ConsensusError(
149            format!("Insufficient prepares: got {}, needed {}", prepares.len(), threshold)
150        ));
151    }
152
153    // Phase 3: Commit (nodes commit)
154    let commit_messages = simulate_commit_phase(&digest, threshold)?;
155
156    if commit_messages.len() < threshold {
157        return Err(FederationError::ConsensusError(
158            format!("Insufficient commits: got {}, needed {}", commit_messages.len(), threshold)
159        ));
160    }
161
162    // Create proof
163    let proof = CommitProof {
164        update_id: update.update_id.clone(),
165        commit_messages,
166        timestamp: current_timestamp(),
167    };
168
169    // Verify proof
170    if !proof.verify(n) {
171        return Err(FederationError::ConsensusError(
172            "Proof verification failed".to_string()
173        ));
174    }
175
176    Ok(proof)
177}
178
179/// Compute digest of a state update
180fn compute_digest(update: &StateUpdate) -> Vec<u8> {
181    use sha2::{Sha256, Digest};
182    let mut hasher = Sha256::new();
183    hasher.update(&update.update_id);
184    hasher.update(&update.data);
185    hasher.update(&update.timestamp.to_le_bytes());
186    hasher.finalize().to_vec()
187}
188
189/// Sign a proposal (placeholder)
190fn sign_proposal(update: &StateUpdate) -> Vec<u8> {
191    use sha2::{Sha256, Digest};
192    let mut hasher = Sha256::new();
193    hasher.update(b"signature:");
194    hasher.update(&update.update_id);
195    hasher.finalize().to_vec()
196}
197
198/// Get next sequence number (placeholder)
199fn get_next_sequence_number() -> u64 {
200    use std::sync::atomic::{AtomicU64, Ordering};
201    static COUNTER: AtomicU64 = AtomicU64::new(1);
202    COUNTER.fetch_add(1, Ordering::SeqCst)
203}
204
205/// Simulate prepare phase (placeholder for network communication)
206fn simulate_prepare_phase(
207    digest: &[u8],
208    threshold: usize,
209) -> Result<Vec<(PeerId, Vec<u8>)>> {
210    let mut prepares = Vec::new();
211
212    // Simulate receiving prepare messages from peers
213    for i in 0..threshold {
214        let peer_id = PeerId::new(format!("peer_{}", i));
215        prepares.push((peer_id, digest.to_vec()));
216    }
217
218    Ok(prepares)
219}
220
221/// Simulate commit phase (placeholder for network communication)
222fn simulate_commit_phase(
223    digest: &[u8],
224    threshold: usize,
225) -> Result<Vec<CommitMessage>> {
226    let mut commits = Vec::new();
227
228    // Simulate receiving commit messages from peers
229    for i in 0..threshold {
230        let peer_id = PeerId::new(format!("peer_{}", i));
231        let signature = sign_commit(digest, &peer_id);
232
233        commits.push(CommitMessage {
234            peer_id,
235            digest: digest.to_vec(),
236            signature,
237        });
238    }
239
240    Ok(commits)
241}
242
243/// Sign a commit message (placeholder)
244fn sign_commit(digest: &[u8], peer_id: &PeerId) -> Vec<u8> {
245    use sha2::{Sha256, Digest};
246    let mut hasher = Sha256::new();
247    hasher.update(b"commit:");
248    hasher.update(digest);
249    hasher.update(peer_id.0.as_bytes());
250    hasher.finalize().to_vec()
251}
252
253/// Get current timestamp
254fn current_timestamp() -> u64 {
255    use std::time::{SystemTime, UNIX_EPOCH};
256    SystemTime::now()
257        .duration_since(UNIX_EPOCH)
258        .unwrap()
259        .as_millis() as u64
260}
261
262#[cfg(test)]
263mod tests {
264    use super::*;
265
266    #[tokio::test]
267    async fn test_byzantine_commit_success() {
268        let update = StateUpdate {
269            update_id: "test_update_1".to_string(),
270            data: vec![1, 2, 3, 4],
271            timestamp: current_timestamp(),
272        };
273
274        // Need at least 4 nodes for BFT (n = 3f + 1, f = 1)
275        let proof = byzantine_commit(update, 4).await.unwrap();
276
277        assert!(proof.verify(4));
278        assert_eq!(proof.update_id, "test_update_1");
279    }
280
281    #[tokio::test]
282    async fn test_byzantine_commit_insufficient_peers() {
283        let update = StateUpdate {
284            update_id: "test_update_2".to_string(),
285            data: vec![1, 2, 3],
286            timestamp: current_timestamp(),
287        };
288
289        // Only 3 nodes - not enough for BFT
290        let result = byzantine_commit(update, 3).await;
291
292        assert!(result.is_err());
293        match result {
294            Err(FederationError::InsufficientPeers { needed, actual }) => {
295                assert_eq!(needed, 4);
296                assert_eq!(actual, 3);
297            }
298            _ => panic!("Expected InsufficientPeers error"),
299        }
300    }
301
302    #[test]
303    fn test_byzantine_threshold() {
304        // n = 3f + 1, threshold = 2f + 1
305        assert_eq!(byzantine_threshold(4), 3);   // f=1, 2f+1=3
306        assert_eq!(byzantine_threshold(7), 5);   // f=2, 2f+1=5
307        assert_eq!(byzantine_threshold(10), 7);  // f=3, 2f+1=7
308    }
309
310    #[test]
311    fn test_commit_proof_verification() {
312        let proof = CommitProof {
313            update_id: "test".to_string(),
314            commit_messages: vec![
315                CommitMessage {
316                    peer_id: PeerId::new("peer1".to_string()),
317                    digest: vec![1, 2, 3],
318                    signature: vec![4, 5, 6],
319                },
320                CommitMessage {
321                    peer_id: PeerId::new("peer2".to_string()),
322                    digest: vec![1, 2, 3],
323                    signature: vec![7, 8, 9],
324                },
325                CommitMessage {
326                    peer_id: PeerId::new("peer3".to_string()),
327                    digest: vec![1, 2, 3],
328                    signature: vec![10, 11, 12],
329                },
330            ],
331            timestamp: current_timestamp(),
332        };
333
334        // For 4 nodes, need 3 commits
335        assert!(proof.verify(4));
336
337        // For 7 nodes, would need 5 commits
338        assert!(!proof.verify(7));
339    }
340}