1use serde::{Deserialize, Serialize};
10use std::collections::HashMap;
11use crate::{Result, FederationError, PeerId, StateUpdate};
12
13#[derive(Debug, Clone, Serialize, Deserialize)]
15pub enum ConsensusMessage {
16 PrePrepare { proposal: SignedProposal },
17 Prepare { digest: Vec<u8>, sender: PeerId },
18 Commit { digest: Vec<u8>, sender: PeerId },
19}
20
21#[derive(Debug, Clone, Serialize, Deserialize)]
23pub struct SignedProposal {
24 pub update: StateUpdate,
25 pub sequence_number: u64,
26 pub signature: Vec<u8>,
27}
28
29#[derive(Debug, Clone, Serialize, Deserialize)]
31pub struct CommitProof {
32 pub update_id: String,
33 pub commit_messages: Vec<CommitMessage>,
34 pub timestamp: u64,
35}
36
37impl CommitProof {
38 pub fn verify(&self, total_nodes: usize) -> bool {
40 let threshold = byzantine_threshold(total_nodes);
41 self.commit_messages.len() >= threshold
42 }
43}
44
45#[derive(Debug, Clone, Serialize, Deserialize)]
47pub struct CommitMessage {
48 pub peer_id: PeerId,
49 pub digest: Vec<u8>,
50 pub signature: Vec<u8>,
51}
52
53#[derive(Debug)]
55pub enum CommitResult {
56 Success(CommitProof),
57 InsufficientPrepares,
58 InsufficientCommits,
59}
60
61fn byzantine_threshold(n: usize) -> usize {
66 (2 * n + 2) / 3
67}
68
69pub async fn byzantine_commit(
114 update: StateUpdate,
115 peer_count: usize,
116) -> Result<CommitProof> {
117 let n = peer_count;
118 let f = if n > 0 { (n - 1) / 3 } else { 0 };
119 let threshold = 2 * f + 1;
120
121 if n < 4 {
122 return Err(FederationError::InsufficientPeers {
123 needed: 4,
124 actual: n,
125 });
126 }
127
128 let sequence_number = get_next_sequence_number();
130 let proposal = SignedProposal {
131 update: update.clone(),
132 sequence_number,
133 signature: sign_proposal(&update),
134 };
135
136 let pre_prepare = ConsensusMessage::PrePrepare {
138 proposal: proposal.clone(),
139 };
140
141 let digest = compute_digest(&update);
143
144 let prepares = simulate_prepare_phase(&digest, threshold)?;
146
147 if prepares.len() < threshold {
148 return Err(FederationError::ConsensusError(
149 format!("Insufficient prepares: got {}, needed {}", prepares.len(), threshold)
150 ));
151 }
152
153 let commit_messages = simulate_commit_phase(&digest, threshold)?;
155
156 if commit_messages.len() < threshold {
157 return Err(FederationError::ConsensusError(
158 format!("Insufficient commits: got {}, needed {}", commit_messages.len(), threshold)
159 ));
160 }
161
162 let proof = CommitProof {
164 update_id: update.update_id.clone(),
165 commit_messages,
166 timestamp: current_timestamp(),
167 };
168
169 if !proof.verify(n) {
171 return Err(FederationError::ConsensusError(
172 "Proof verification failed".to_string()
173 ));
174 }
175
176 Ok(proof)
177}
178
179fn compute_digest(update: &StateUpdate) -> Vec<u8> {
181 use sha2::{Sha256, Digest};
182 let mut hasher = Sha256::new();
183 hasher.update(&update.update_id);
184 hasher.update(&update.data);
185 hasher.update(&update.timestamp.to_le_bytes());
186 hasher.finalize().to_vec()
187}
188
189fn sign_proposal(update: &StateUpdate) -> Vec<u8> {
191 use sha2::{Sha256, Digest};
192 let mut hasher = Sha256::new();
193 hasher.update(b"signature:");
194 hasher.update(&update.update_id);
195 hasher.finalize().to_vec()
196}
197
198fn get_next_sequence_number() -> u64 {
200 use std::sync::atomic::{AtomicU64, Ordering};
201 static COUNTER: AtomicU64 = AtomicU64::new(1);
202 COUNTER.fetch_add(1, Ordering::SeqCst)
203}
204
205fn simulate_prepare_phase(
207 digest: &[u8],
208 threshold: usize,
209) -> Result<Vec<(PeerId, Vec<u8>)>> {
210 let mut prepares = Vec::new();
211
212 for i in 0..threshold {
214 let peer_id = PeerId::new(format!("peer_{}", i));
215 prepares.push((peer_id, digest.to_vec()));
216 }
217
218 Ok(prepares)
219}
220
221fn simulate_commit_phase(
223 digest: &[u8],
224 threshold: usize,
225) -> Result<Vec<CommitMessage>> {
226 let mut commits = Vec::new();
227
228 for i in 0..threshold {
230 let peer_id = PeerId::new(format!("peer_{}", i));
231 let signature = sign_commit(digest, &peer_id);
232
233 commits.push(CommitMessage {
234 peer_id,
235 digest: digest.to_vec(),
236 signature,
237 });
238 }
239
240 Ok(commits)
241}
242
243fn sign_commit(digest: &[u8], peer_id: &PeerId) -> Vec<u8> {
245 use sha2::{Sha256, Digest};
246 let mut hasher = Sha256::new();
247 hasher.update(b"commit:");
248 hasher.update(digest);
249 hasher.update(peer_id.0.as_bytes());
250 hasher.finalize().to_vec()
251}
252
253fn current_timestamp() -> u64 {
255 use std::time::{SystemTime, UNIX_EPOCH};
256 SystemTime::now()
257 .duration_since(UNIX_EPOCH)
258 .unwrap()
259 .as_millis() as u64
260}
261
262#[cfg(test)]
263mod tests {
264 use super::*;
265
266 #[tokio::test]
267 async fn test_byzantine_commit_success() {
268 let update = StateUpdate {
269 update_id: "test_update_1".to_string(),
270 data: vec![1, 2, 3, 4],
271 timestamp: current_timestamp(),
272 };
273
274 let proof = byzantine_commit(update, 4).await.unwrap();
276
277 assert!(proof.verify(4));
278 assert_eq!(proof.update_id, "test_update_1");
279 }
280
281 #[tokio::test]
282 async fn test_byzantine_commit_insufficient_peers() {
283 let update = StateUpdate {
284 update_id: "test_update_2".to_string(),
285 data: vec![1, 2, 3],
286 timestamp: current_timestamp(),
287 };
288
289 let result = byzantine_commit(update, 3).await;
291
292 assert!(result.is_err());
293 match result {
294 Err(FederationError::InsufficientPeers { needed, actual }) => {
295 assert_eq!(needed, 4);
296 assert_eq!(actual, 3);
297 }
298 _ => panic!("Expected InsufficientPeers error"),
299 }
300 }
301
302 #[test]
303 fn test_byzantine_threshold() {
304 assert_eq!(byzantine_threshold(4), 3); assert_eq!(byzantine_threshold(7), 5); assert_eq!(byzantine_threshold(10), 7); }
309
310 #[test]
311 fn test_commit_proof_verification() {
312 let proof = CommitProof {
313 update_id: "test".to_string(),
314 commit_messages: vec![
315 CommitMessage {
316 peer_id: PeerId::new("peer1".to_string()),
317 digest: vec![1, 2, 3],
318 signature: vec![4, 5, 6],
319 },
320 CommitMessage {
321 peer_id: PeerId::new("peer2".to_string()),
322 digest: vec![1, 2, 3],
323 signature: vec![7, 8, 9],
324 },
325 CommitMessage {
326 peer_id: PeerId::new("peer3".to_string()),
327 digest: vec![1, 2, 3],
328 signature: vec![10, 11, 12],
329 },
330 ],
331 timestamp: current_timestamp(),
332 };
333
334 assert!(proof.verify(4));
336
337 assert!(!proof.verify(7));
339 }
340}