1use std::{collections::HashMap, time::Duration};
8
9use crate::{
10 error::ConsensusError,
11 protos::consensus::v1::{Proposal, Vote},
12 scope_config::{NetworkType, ScopeConfig},
13 signing::ConsensusSignatureScheme,
14 types::SessionTransition,
15 utils::{
16 calculate_consensus_result, calculate_max_rounds, current_timestamp, validate_proposal,
17 validate_proposal_timestamp, validate_vote, validate_vote_chain,
18 },
19};
20
21#[derive(Debug, Clone)]
27pub struct ConsensusConfig {
28 consensus_threshold: f64,
30 consensus_timeout: Duration,
32 max_rounds: u32,
37 use_gossipsub_rounds: bool,
42 liveness_criteria: bool,
44}
45
46impl From<NetworkType> for ConsensusConfig {
47 fn from(network_type: NetworkType) -> Self {
48 ConsensusConfig::from(ScopeConfig::from(network_type))
49 }
50}
51
52impl From<ScopeConfig> for ConsensusConfig {
53 fn from(config: ScopeConfig) -> Self {
54 let (max_rounds, use_gossipsub_rounds) = match config.network_type {
55 NetworkType::Gossipsub => (config.max_rounds_override.unwrap_or(2), true),
56 NetworkType::P2P => (config.max_rounds_override.unwrap_or(0), false),
58 };
59
60 ConsensusConfig::new(
61 config.default_consensus_threshold,
62 config.default_timeout,
63 max_rounds,
64 use_gossipsub_rounds,
65 config.default_liveness_criteria_yes,
66 )
67 }
68}
69
70impl ConsensusConfig {
71 pub fn p2p() -> Self {
74 ConsensusConfig::from(NetworkType::P2P)
75 }
76
77 pub fn gossipsub() -> Self {
79 ConsensusConfig::from(NetworkType::Gossipsub)
80 }
81
82 pub fn with_timeout(mut self, consensus_timeout: Duration) -> Result<Self, ConsensusError> {
84 crate::utils::validate_timeout(consensus_timeout)?;
85 self.consensus_timeout = consensus_timeout;
86 Ok(self)
87 }
88
89 pub fn with_threshold(mut self, consensus_threshold: f64) -> Result<Self, ConsensusError> {
91 crate::utils::validate_threshold(consensus_threshold)?;
92 self.consensus_threshold = consensus_threshold;
93 Ok(self)
94 }
95
96 pub fn with_liveness_criteria(mut self, liveness_criteria: bool) -> Self {
98 self.liveness_criteria = liveness_criteria;
99 self
100 }
101
102 pub(crate) fn new(
105 consensus_threshold: f64,
106 consensus_timeout: Duration,
107 max_rounds: u32,
108 use_gossipsub_rounds: bool,
109 liveness_criteria: bool,
110 ) -> Self {
111 Self {
112 consensus_threshold,
113 consensus_timeout,
114 max_rounds,
115 use_gossipsub_rounds,
116 liveness_criteria,
117 }
118 }
119
120 fn max_round_limit(&self, expected_voters_count: u32) -> u32 {
121 if self.use_gossipsub_rounds {
122 self.max_rounds
123 } else if self.max_rounds == 0 {
124 calculate_max_rounds(expected_voters_count, self.consensus_threshold)
125 } else {
126 self.max_rounds
127 }
128 }
129
130 pub fn consensus_timeout(&self) -> Duration {
132 self.consensus_timeout
133 }
134
135 pub fn consensus_threshold(&self) -> f64 {
137 self.consensus_threshold
138 }
139
140 pub fn liveness_criteria(&self) -> bool {
142 self.liveness_criteria
143 }
144
145 pub fn max_rounds(&self) -> u32 {
147 self.max_rounds
148 }
149
150 pub fn use_gossipsub_rounds(&self) -> bool {
152 self.use_gossipsub_rounds
153 }
154}
155
156#[derive(Debug, Clone)]
157pub enum ConsensusState {
158 Active,
160 ConsensusReached(bool),
162 Failed,
164}
165
166#[derive(Debug, Clone)]
167pub struct ConsensusSession {
168 pub proposal: Proposal,
170 pub state: ConsensusState,
172 pub votes: HashMap<Vec<u8>, Vote>, pub created_at: u64,
176 pub config: ConsensusConfig,
178}
179
180impl ConsensusSession {
181 fn new(proposal: Proposal, config: ConsensusConfig) -> Self {
184 let now = current_timestamp().unwrap_or(0);
185 Self {
186 proposal,
187 state: ConsensusState::Active,
188 votes: HashMap::new(),
189 created_at: now,
190 config,
191 }
192 }
193
194 pub fn from_proposal<Signer: ConsensusSignatureScheme>(
198 proposal: Proposal,
199 config: ConsensusConfig,
200 ) -> Result<(Self, SessionTransition), ConsensusError> {
201 validate_proposal::<Signer>(&proposal)?;
202
203 let existing_votes = proposal.votes.clone();
205 let mut clean_proposal = proposal.clone();
206 clean_proposal.votes.clear();
207 clean_proposal.round = 1;
209
210 let mut session = Self::new(clean_proposal, config);
211 let transition = session.initialize_with_votes::<Signer>(
212 existing_votes,
213 proposal.expiration_timestamp,
214 proposal.timestamp,
215 )?;
216
217 Ok((session, transition))
218 }
219
220 pub(crate) fn add_vote(&mut self, vote: Vote) -> Result<SessionTransition, ConsensusError> {
222 match self.state {
223 ConsensusState::Active => {
224 validate_proposal_timestamp(self.proposal.expiration_timestamp)?;
225
226 self.check_round_limit(1)?;
228
229 if self.votes.contains_key(&vote.vote_owner) {
230 return Err(ConsensusError::DuplicateVote);
231 }
232 self.votes.insert(vote.vote_owner.clone(), vote.clone());
233 self.proposal.votes.push(vote.clone());
234
235 self.update_round(1);
236 Ok(self.check_consensus())
237 }
238 ConsensusState::ConsensusReached(res) => Ok(SessionTransition::ConsensusReached(res)),
239 _ => Err(ConsensusError::SessionNotActive),
240 }
241 }
242
243 pub(crate) fn initialize_with_votes<Signer: ConsensusSignatureScheme>(
246 &mut self,
247 votes: Vec<Vote>,
248 expiration_timestamp: u64,
249 creation_time: u64,
250 ) -> Result<SessionTransition, ConsensusError> {
251 if !matches!(self.state, ConsensusState::Active) {
252 return Err(ConsensusError::SessionNotActive);
253 }
254
255 validate_proposal_timestamp(expiration_timestamp)?;
256
257 if votes.is_empty() {
258 return Ok(SessionTransition::StillActive);
259 }
260
261 let mut seen_owners = std::collections::HashSet::new();
262 for vote in &votes {
263 if !seen_owners.insert(&vote.vote_owner) {
264 return Err(ConsensusError::DuplicateVote);
265 }
266 }
267
268 if votes.len() > self.proposal.expected_voters_count as usize {
271 self.state = ConsensusState::Failed;
272 return Err(ConsensusError::MaxRoundsExceeded);
273 }
274
275 validate_vote_chain(&votes)?;
276 for vote in &votes {
277 validate_vote::<Signer>(vote, expiration_timestamp, creation_time)?;
278 }
279
280 self.check_round_limit(votes.len())?;
281 self.update_round(votes.len());
282
283 for vote in votes {
284 self.votes.insert(vote.vote_owner.clone(), vote.clone());
285 self.proposal.votes.push(vote);
286 }
287
288 Ok(self.check_consensus())
289 }
290
291 fn check_round_limit(&mut self, vote_count: usize) -> Result<(), ConsensusError> {
298 if vote_count > self.proposal.expected_voters_count as usize {
300 self.state = ConsensusState::Failed;
301 return Err(ConsensusError::MaxRoundsExceeded);
302 }
303
304 let projected_value = if self.config.use_gossipsub_rounds {
306 if self.proposal.round == 2 || (self.proposal.round == 1 && vote_count > 0) {
311 2
312 } else {
313 self.proposal.round }
315 } else {
316 let current_votes = self.proposal.round.saturating_sub(1);
322 current_votes.saturating_add(vote_count as u32)
323 };
324
325 if projected_value
326 > self
327 .config
328 .max_round_limit(self.proposal.expected_voters_count)
329 {
330 self.state = ConsensusState::Failed;
331 return Err(ConsensusError::MaxRoundsExceeded);
332 }
333
334 Ok(())
335 }
336
337 fn update_round(&mut self, vote_count: usize) {
343 if self.config.use_gossipsub_rounds {
344 if self.proposal.round == 1 && vote_count > 0 {
349 self.proposal.round = 2;
350 }
351 } else {
352 self.proposal.round = self.proposal.round.saturating_add(vote_count as u32);
356 }
357 }
358
359 fn check_consensus(&mut self) -> SessionTransition {
364 let expected_voters = self.proposal.expected_voters_count;
365 let threshold = self.config.consensus_threshold;
366 let liveness = self.proposal.liveness_criteria_yes;
367
368 match calculate_consensus_result(&self.votes, expected_voters, threshold, liveness, false) {
369 Some(result) => {
370 self.state = ConsensusState::ConsensusReached(result);
371 SessionTransition::ConsensusReached(result)
372 }
373 None => {
374 self.state = ConsensusState::Active;
375 SessionTransition::StillActive
376 }
377 }
378 }
379
380 pub fn is_active(&self) -> bool {
382 matches!(self.state, ConsensusState::Active)
383 }
384
385 pub fn get_consensus_result(&self) -> Result<bool, ConsensusError> {
390 if let ConsensusState::ConsensusReached(result) = self.state {
391 Ok(result)
392 } else {
393 Err(ConsensusError::ConsensusNotReached)
394 }
395 }
396}
397
398#[cfg(test)]
399mod tests {
400 use std::time::Duration;
401
402 use alloy::signers::local::PrivateKeySigner;
403
404 use crate::{
405 error::ConsensusError,
406 session::{ConsensusConfig, ConsensusSession, ConsensusState},
407 signing::EthereumConsensusSigner,
408 types::CreateProposalRequest,
409 utils::build_vote,
410 };
411
412 fn wrap(signer: PrivateKeySigner) -> EthereumConsensusSigner {
413 EthereumConsensusSigner::new(signer)
414 }
415
416 #[tokio::test]
417 async fn enforce_max_rounds_gossipsub() {
418 let signer1 = PrivateKeySigner::random();
421 let signer2 = PrivateKeySigner::random();
422 let signer3 = PrivateKeySigner::random();
423 let signer4 = PrivateKeySigner::random();
424
425 let request = CreateProposalRequest::new(
426 "Test".into(),
427 "".into(),
428 signer1.address().as_slice().to_vec(),
429 4, 60,
431 false,
432 )
433 .unwrap();
434
435 let proposal = request.into_proposal().unwrap();
436 let config = ConsensusConfig::gossipsub();
437 let mut session = ConsensusSession::new(proposal, config);
438
439 let vote1 = build_vote(&session.proposal, true, &wrap(signer1))
441 .await
442 .unwrap();
443 session.add_vote(vote1).unwrap();
444 assert_eq!(session.proposal.round, 2);
445
446 let vote2 = build_vote(&session.proposal, false, &wrap(signer2))
448 .await
449 .unwrap();
450 session.add_vote(vote2).unwrap();
451 assert_eq!(session.proposal.round, 2);
452
453 let vote3 = build_vote(&session.proposal, true, &wrap(signer3))
455 .await
456 .unwrap();
457 session.add_vote(vote3).unwrap();
458 assert_eq!(session.proposal.round, 2);
459
460 let vote4 = build_vote(&session.proposal, true, &wrap(signer4))
462 .await
463 .unwrap();
464 session.add_vote(vote4).unwrap();
465 assert_eq!(session.proposal.round, 2);
466 assert_eq!(session.votes.len(), 4);
467 }
468
469 #[tokio::test]
470 async fn enforce_max_rounds_p2p() {
471 let signer1 = PrivateKeySigner::random();
475 let signer2 = PrivateKeySigner::random();
476 let signer3 = PrivateKeySigner::random();
477 let signer4 = PrivateKeySigner::random();
478 let signer5 = PrivateKeySigner::random();
479
480 let request = CreateProposalRequest::new(
481 "Test".into(),
482 "".into(),
483 signer1.address().as_slice().to_vec(),
484 5,
485 60,
486 false,
487 )
488 .unwrap();
489
490 let proposal = request.into_proposal().unwrap();
491 let config = ConsensusConfig::p2p();
492 let mut session = ConsensusSession::new(proposal, config);
493
494 let vote1 = build_vote(&session.proposal, true, &wrap(signer1))
496 .await
497 .unwrap();
498 session.add_vote(vote1).unwrap();
499 assert_eq!(session.proposal.round, 2);
500 assert_eq!(session.votes.len(), 1);
501
502 let vote2 = build_vote(&session.proposal, false, &wrap(signer2))
504 .await
505 .unwrap();
506 session.add_vote(vote2).unwrap();
507 assert_eq!(session.proposal.round, 3);
508 assert_eq!(session.votes.len(), 2);
509
510 let vote3 = build_vote(&session.proposal, true, &wrap(signer3))
512 .await
513 .unwrap();
514 session.add_vote(vote3).unwrap();
515 assert_eq!(session.proposal.round, 4);
516 assert_eq!(session.votes.len(), 3);
517
518 let vote4 = build_vote(&session.proposal, true, &wrap(signer4))
520 .await
521 .unwrap();
522 session.add_vote(vote4).unwrap();
523 assert_eq!(session.proposal.round, 5);
524 assert_eq!(session.votes.len(), 4);
525
526 let vote5 = build_vote(&session.proposal, true, &wrap(signer5))
528 .await
529 .unwrap();
530 let err = session.add_vote(vote5).unwrap_err();
531 assert!(matches!(err, ConsensusError::MaxRoundsExceeded));
532 }
533
534 #[test]
535 fn consensus_config_builder_and_getters_cover_edges() {
536 let cfg = ConsensusConfig::gossipsub()
537 .with_threshold(0.75)
538 .unwrap()
539 .with_timeout(Duration::from_secs(42))
540 .unwrap()
541 .with_liveness_criteria(false);
542
543 assert_eq!(cfg.consensus_threshold(), 0.75);
544 assert_eq!(cfg.consensus_timeout(), Duration::from_secs(42));
545 assert!(!cfg.liveness_criteria());
546
547 let err = ConsensusConfig::gossipsub()
548 .with_threshold(1.1)
549 .unwrap_err();
550 assert!(matches!(err, ConsensusError::InvalidConsensusThreshold));
551
552 let err = ConsensusConfig::gossipsub()
553 .with_timeout(Duration::from_secs(0))
554 .unwrap_err();
555 assert!(matches!(err, ConsensusError::InvalidTimeout));
556
557 let explicit = ConsensusConfig::new(2.0 / 3.0, Duration::from_secs(60), 7, false, true);
559 assert_eq!(explicit.max_round_limit(100), 7);
560 }
561
562 #[tokio::test]
563 async fn add_vote_rejects_non_active_and_reports_reached_when_finalized() {
564 let signer = PrivateKeySigner::random();
565 let request = CreateProposalRequest::new(
566 "Test".into(),
567 "".into(),
568 signer.address().as_slice().to_vec(),
569 3,
570 60,
571 true,
572 )
573 .unwrap();
574 let proposal = request.into_proposal().unwrap();
575
576 let mut failed_session =
578 ConsensusSession::new(proposal.clone(), ConsensusConfig::gossipsub());
579 failed_session.state = ConsensusState::Failed;
580 let vote = build_vote(&failed_session.proposal, true, &wrap(signer.clone()))
581 .await
582 .unwrap();
583 let err = failed_session.add_vote(vote).unwrap_err();
584 assert!(matches!(err, ConsensusError::SessionNotActive));
585
586 let mut finalized_session = ConsensusSession::new(proposal, ConsensusConfig::gossipsub());
588 finalized_session.state = ConsensusState::ConsensusReached(true);
589 let vote = build_vote(&finalized_session.proposal, true, &wrap(signer))
590 .await
591 .unwrap();
592 let transition = finalized_session.add_vote(vote).unwrap();
593 assert!(matches!(
594 transition,
595 crate::types::SessionTransition::ConsensusReached(true)
596 ));
597 }
598
599 #[tokio::test]
600 async fn initialize_with_votes_non_active_duplicate_and_zero_votes_paths() {
601 let signer = PrivateKeySigner::random();
602 let request = CreateProposalRequest::new(
603 "Test".into(),
604 "".into(),
605 signer.address().as_slice().to_vec(),
606 4,
607 60,
608 true,
609 )
610 .unwrap();
611 let proposal = request.into_proposal().unwrap();
612
613 let mut inactive = ConsensusSession::new(proposal.clone(), ConsensusConfig::gossipsub());
615 inactive.state = ConsensusState::Failed;
616 let err = inactive
617 .initialize_with_votes::<EthereumConsensusSigner>(
618 vec![],
619 proposal.expiration_timestamp,
620 proposal.timestamp,
621 )
622 .unwrap_err();
623 assert!(matches!(err, ConsensusError::SessionNotActive));
624
625 let mut dup_session = ConsensusSession::new(proposal.clone(), ConsensusConfig::gossipsub());
627 let vote1 = build_vote(&dup_session.proposal, true, &wrap(signer.clone()))
628 .await
629 .unwrap();
630 let vote2 = build_vote(&dup_session.proposal, false, &wrap(signer))
631 .await
632 .unwrap();
633 let err = dup_session
634 .initialize_with_votes::<EthereumConsensusSigner>(
635 vec![vote1, vote2],
636 proposal.expiration_timestamp,
637 proposal.timestamp,
638 )
639 .unwrap_err();
640 assert!(matches!(err, ConsensusError::DuplicateVote));
641
642 let mut zero_votes = ConsensusSession::new(proposal, ConsensusConfig::gossipsub());
644 zero_votes.check_round_limit(0).unwrap();
645 }
646
647 #[test]
648 fn p2p_round_limit_should_reject_effectively_huge_vote_count() {
649 if usize::BITS <= 32 {
650 return;
651 }
652
653 let signer = PrivateKeySigner::random();
654 let request = CreateProposalRequest::new(
655 "TruncationTest".into(),
656 vec![],
657 signer.address().as_slice().to_vec(),
658 1,
659 60,
660 true,
661 )
662 .unwrap();
663
664 let proposal = request.into_proposal().unwrap();
665 let mut session = ConsensusSession::new(proposal, ConsensusConfig::p2p());
666
667 let wrapped_vote_count = (u32::MAX as usize) + 1;
668
669 let result = session.check_round_limit(wrapped_vote_count);
671 assert!(
672 result.is_err(),
673 "effectively huge vote_count should not pass round-limit checks"
674 );
675 }
676
677 #[test]
678 fn p2p_update_round_should_advance_for_max_u32_vote_count() {
679 let signer = PrivateKeySigner::random();
680 let request = CreateProposalRequest::new(
681 "RoundUpdateMax".into(),
682 vec![],
683 signer.address().as_slice().to_vec(),
684 u32::MAX,
685 60,
686 true,
687 )
688 .unwrap();
689
690 let proposal = request.into_proposal().unwrap();
691 let mut session = ConsensusSession::new(proposal, ConsensusConfig::p2p());
692 let starting_round = session.proposal.round;
693
694 session.update_round(u32::MAX as usize);
696
697 assert!(
698 session.proposal.round > starting_round,
699 "round should advance when max u32 vote_count is applied"
700 );
701 assert_eq!(session.proposal.round, u32::MAX);
702 }
703}