1use std::{collections::HashMap, time::Duration};
8
9use crate::{
10 error::ConsensusError,
11 protos::consensus::v1::{Proposal, Vote},
12 scope_config::{NetworkType, ScopeConfig},
13 types::SessionTransition,
14 utils::{
15 calculate_consensus_result, calculate_max_rounds, current_timestamp, validate_proposal,
16 validate_proposal_timestamp, validate_vote, validate_vote_chain,
17 },
18};
19
20#[derive(Debug, Clone)]
26pub struct ConsensusConfig {
27 consensus_threshold: f64,
29 consensus_timeout: Duration,
31 max_rounds: u32,
36 use_gossipsub_rounds: bool,
41 liveness_criteria: bool,
43}
44
45impl From<NetworkType> for ConsensusConfig {
46 fn from(network_type: NetworkType) -> Self {
47 ConsensusConfig::from(ScopeConfig::from(network_type))
48 }
49}
50
51impl From<ScopeConfig> for ConsensusConfig {
52 fn from(config: ScopeConfig) -> Self {
53 let (max_rounds, use_gossipsub_rounds) = match config.network_type {
54 NetworkType::Gossipsub => (config.max_rounds_override.unwrap_or(2), true),
55 NetworkType::P2P => (config.max_rounds_override.unwrap_or(0), false),
57 };
58
59 ConsensusConfig::new(
60 config.default_consensus_threshold,
61 config.default_timeout,
62 max_rounds,
63 use_gossipsub_rounds,
64 config.default_liveness_criteria_yes,
65 )
66 }
67}
68
69impl ConsensusConfig {
70 pub fn p2p() -> Self {
73 ConsensusConfig::from(NetworkType::P2P)
74 }
75
76 pub fn gossipsub() -> Self {
78 ConsensusConfig::from(NetworkType::Gossipsub)
79 }
80
81 pub fn with_timeout(mut self, consensus_timeout: Duration) -> Result<Self, ConsensusError> {
83 crate::utils::validate_timeout(consensus_timeout)?;
84 self.consensus_timeout = consensus_timeout;
85 Ok(self)
86 }
87
88 pub fn with_threshold(mut self, consensus_threshold: f64) -> Result<Self, ConsensusError> {
90 crate::utils::validate_threshold(consensus_threshold)?;
91 self.consensus_threshold = consensus_threshold;
92 Ok(self)
93 }
94
95 pub fn with_liveness_criteria(mut self, liveness_criteria: bool) -> Self {
97 self.liveness_criteria = liveness_criteria;
98 self
99 }
100
101 pub(crate) fn new(
104 consensus_threshold: f64,
105 consensus_timeout: Duration,
106 max_rounds: u32,
107 use_gossipsub_rounds: bool,
108 liveness_criteria: bool,
109 ) -> Self {
110 Self {
111 consensus_threshold,
112 consensus_timeout,
113 max_rounds,
114 use_gossipsub_rounds,
115 liveness_criteria,
116 }
117 }
118
119 fn max_round_limit(&self, expected_voters_count: u32) -> u32 {
120 if self.use_gossipsub_rounds {
121 self.max_rounds
122 } else if self.max_rounds == 0 {
123 calculate_max_rounds(expected_voters_count, self.consensus_threshold)
124 } else {
125 self.max_rounds
126 }
127 }
128
129 pub fn consensus_timeout(&self) -> Duration {
131 self.consensus_timeout
132 }
133
134 pub fn consensus_threshold(&self) -> f64 {
136 self.consensus_threshold
137 }
138
139 pub fn liveness_criteria(&self) -> bool {
141 self.liveness_criteria
142 }
143
144 pub fn max_rounds(&self) -> u32 {
146 self.max_rounds
147 }
148
149 pub fn use_gossipsub_rounds(&self) -> bool {
151 self.use_gossipsub_rounds
152 }
153}
154
155#[derive(Debug, Clone)]
156pub enum ConsensusState {
157 Active,
159 ConsensusReached(bool),
161 Failed,
163}
164
165#[derive(Debug, Clone)]
166pub struct ConsensusSession {
167 pub proposal: Proposal,
169 pub state: ConsensusState,
171 pub votes: HashMap<Vec<u8>, Vote>, pub created_at: u64,
175 pub config: ConsensusConfig,
177}
178
179impl ConsensusSession {
180 fn new(proposal: Proposal, config: ConsensusConfig) -> Self {
183 let now = current_timestamp().unwrap_or(0);
184 Self {
185 proposal,
186 state: ConsensusState::Active,
187 votes: HashMap::new(),
188 created_at: now,
189 config,
190 }
191 }
192
193 pub fn from_proposal(
197 proposal: Proposal,
198 config: ConsensusConfig,
199 ) -> Result<(Self, SessionTransition), ConsensusError> {
200 validate_proposal(&proposal)?;
201
202 let existing_votes = proposal.votes.clone();
204 let mut clean_proposal = proposal.clone();
205 clean_proposal.votes.clear();
206 clean_proposal.round = 1;
208
209 let mut session = Self::new(clean_proposal, config);
210 let transition = session.initialize_with_votes(
211 existing_votes,
212 proposal.expiration_timestamp,
213 proposal.timestamp,
214 )?;
215
216 Ok((session, transition))
217 }
218
219 pub(crate) fn add_vote(&mut self, vote: Vote) -> Result<SessionTransition, ConsensusError> {
221 match self.state {
222 ConsensusState::Active => {
223 validate_proposal_timestamp(self.proposal.expiration_timestamp)?;
224
225 self.check_round_limit(1)?;
227
228 if self.votes.contains_key(&vote.vote_owner) {
229 return Err(ConsensusError::DuplicateVote);
230 }
231 self.votes.insert(vote.vote_owner.clone(), vote.clone());
232 self.proposal.votes.push(vote.clone());
233
234 self.update_round(1);
235 Ok(self.check_consensus())
236 }
237 ConsensusState::ConsensusReached(res) => Ok(SessionTransition::ConsensusReached(res)),
238 _ => Err(ConsensusError::SessionNotActive),
239 }
240 }
241
242 pub(crate) fn initialize_with_votes(
245 &mut self,
246 votes: Vec<Vote>,
247 expiration_timestamp: u64,
248 creation_time: u64,
249 ) -> Result<SessionTransition, ConsensusError> {
250 if !matches!(self.state, ConsensusState::Active) {
251 return Err(ConsensusError::SessionNotActive);
252 }
253
254 validate_proposal_timestamp(expiration_timestamp)?;
255
256 if votes.is_empty() {
257 return Ok(SessionTransition::StillActive);
258 }
259
260 let mut seen_owners = std::collections::HashSet::new();
261 for vote in &votes {
262 if !seen_owners.insert(&vote.vote_owner) {
263 return Err(ConsensusError::DuplicateVote);
264 }
265 }
266
267 if votes.len() > self.proposal.expected_voters_count as usize {
270 self.state = ConsensusState::Failed;
271 return Err(ConsensusError::MaxRoundsExceeded);
272 }
273
274 validate_vote_chain(&votes)?;
275 for vote in &votes {
276 validate_vote(vote, expiration_timestamp, creation_time)?;
277 }
278
279 self.check_round_limit(votes.len())?;
280 self.update_round(votes.len());
281
282 for vote in votes {
283 self.votes.insert(vote.vote_owner.clone(), vote.clone());
284 self.proposal.votes.push(vote);
285 }
286
287 Ok(self.check_consensus())
288 }
289
290 fn check_round_limit(&mut self, vote_count: usize) -> Result<(), ConsensusError> {
297 if vote_count > self.proposal.expected_voters_count as usize {
299 self.state = ConsensusState::Failed;
300 return Err(ConsensusError::MaxRoundsExceeded);
301 }
302
303 let projected_value = if self.config.use_gossipsub_rounds {
305 if self.proposal.round == 2 || (self.proposal.round == 1 && vote_count > 0) {
310 2
311 } else {
312 self.proposal.round }
314 } else {
315 let current_votes = self.proposal.round.saturating_sub(1);
321 current_votes.saturating_add(vote_count as u32)
322 };
323
324 if projected_value
325 > self
326 .config
327 .max_round_limit(self.proposal.expected_voters_count)
328 {
329 self.state = ConsensusState::Failed;
330 return Err(ConsensusError::MaxRoundsExceeded);
331 }
332
333 Ok(())
334 }
335
336 fn update_round(&mut self, vote_count: usize) {
342 if self.config.use_gossipsub_rounds {
343 if self.proposal.round == 1 && vote_count > 0 {
348 self.proposal.round = 2;
349 }
350 } else {
351 self.proposal.round = self.proposal.round.saturating_add(vote_count as u32);
355 }
356 }
357
358 fn check_consensus(&mut self) -> SessionTransition {
363 let expected_voters = self.proposal.expected_voters_count;
364 let threshold = self.config.consensus_threshold;
365 let liveness = self.proposal.liveness_criteria_yes;
366
367 match calculate_consensus_result(&self.votes, expected_voters, threshold, liveness, false) {
368 Some(result) => {
369 self.state = ConsensusState::ConsensusReached(result);
370 SessionTransition::ConsensusReached(result)
371 }
372 None => {
373 self.state = ConsensusState::Active;
374 SessionTransition::StillActive
375 }
376 }
377 }
378
379 pub fn is_active(&self) -> bool {
381 matches!(self.state, ConsensusState::Active)
382 }
383
384 pub fn get_consensus_result(&self) -> Result<bool, ConsensusError> {
389 if let ConsensusState::ConsensusReached(result) = self.state {
390 Ok(result)
391 } else {
392 Err(ConsensusError::ConsensusNotReached)
393 }
394 }
395}
396
397#[cfg(test)]
398mod tests {
399 use std::time::Duration;
400
401 use alloy::signers::local::PrivateKeySigner;
402
403 use crate::{
404 error::ConsensusError,
405 session::{ConsensusConfig, ConsensusSession, ConsensusState},
406 types::CreateProposalRequest,
407 utils::build_vote,
408 };
409
410 #[tokio::test]
411 async fn enforce_max_rounds_gossipsub() {
412 let signer1 = PrivateKeySigner::random();
415 let signer2 = PrivateKeySigner::random();
416 let signer3 = PrivateKeySigner::random();
417 let signer4 = PrivateKeySigner::random();
418
419 let request = CreateProposalRequest::new(
420 "Test".into(),
421 "".into(),
422 signer1.address().as_slice().to_vec(),
423 4, 60,
425 false,
426 )
427 .unwrap();
428
429 let proposal = request.into_proposal().unwrap();
430 let config = ConsensusConfig::gossipsub();
431 let mut session = ConsensusSession::new(proposal, config);
432
433 let vote1 = build_vote(&session.proposal, true, signer1).await.unwrap();
435 session.add_vote(vote1).unwrap();
436 assert_eq!(session.proposal.round, 2);
437
438 let vote2 = build_vote(&session.proposal, false, signer2).await.unwrap();
440 session.add_vote(vote2).unwrap();
441 assert_eq!(session.proposal.round, 2);
442
443 let vote3 = build_vote(&session.proposal, true, signer3).await.unwrap();
445 session.add_vote(vote3).unwrap();
446 assert_eq!(session.proposal.round, 2);
447
448 let vote4 = build_vote(&session.proposal, true, signer4).await.unwrap();
450 session.add_vote(vote4).unwrap();
451 assert_eq!(session.proposal.round, 2);
452 assert_eq!(session.votes.len(), 4);
453 }
454
455 #[tokio::test]
456 async fn enforce_max_rounds_p2p() {
457 let signer1 = PrivateKeySigner::random();
461 let signer2 = PrivateKeySigner::random();
462 let signer3 = PrivateKeySigner::random();
463 let signer4 = PrivateKeySigner::random();
464 let signer5 = PrivateKeySigner::random();
465
466 let request = CreateProposalRequest::new(
467 "Test".into(),
468 "".into(),
469 signer1.address().as_slice().to_vec(),
470 5,
471 60,
472 false,
473 )
474 .unwrap();
475
476 let proposal = request.into_proposal().unwrap();
477 let config = ConsensusConfig::p2p();
478 let mut session = ConsensusSession::new(proposal, config);
479
480 let vote1 = build_vote(&session.proposal, true, signer1).await.unwrap();
482 session.add_vote(vote1).unwrap();
483 assert_eq!(session.proposal.round, 2);
484 assert_eq!(session.votes.len(), 1);
485
486 let vote2 = build_vote(&session.proposal, false, signer2).await.unwrap();
488 session.add_vote(vote2).unwrap();
489 assert_eq!(session.proposal.round, 3);
490 assert_eq!(session.votes.len(), 2);
491
492 let vote3 = build_vote(&session.proposal, true, signer3).await.unwrap();
494 session.add_vote(vote3).unwrap();
495 assert_eq!(session.proposal.round, 4);
496 assert_eq!(session.votes.len(), 3);
497
498 let vote4 = build_vote(&session.proposal, true, signer4).await.unwrap();
500 session.add_vote(vote4).unwrap();
501 assert_eq!(session.proposal.round, 5);
502 assert_eq!(session.votes.len(), 4);
503
504 let vote5 = build_vote(&session.proposal, true, signer5).await.unwrap();
506 let err = session.add_vote(vote5).unwrap_err();
507 assert!(matches!(err, ConsensusError::MaxRoundsExceeded));
508 }
509
510 #[test]
511 fn consensus_config_builder_and_getters_cover_edges() {
512 let cfg = ConsensusConfig::gossipsub()
513 .with_threshold(0.75)
514 .unwrap()
515 .with_timeout(Duration::from_secs(42))
516 .unwrap()
517 .with_liveness_criteria(false);
518
519 assert_eq!(cfg.consensus_threshold(), 0.75);
520 assert_eq!(cfg.consensus_timeout(), Duration::from_secs(42));
521 assert!(!cfg.liveness_criteria());
522
523 let err = ConsensusConfig::gossipsub()
524 .with_threshold(1.1)
525 .unwrap_err();
526 assert!(matches!(err, ConsensusError::InvalidConsensusThreshold));
527
528 let err = ConsensusConfig::gossipsub()
529 .with_timeout(Duration::from_secs(0))
530 .unwrap_err();
531 assert!(matches!(err, ConsensusError::InvalidTimeout));
532
533 let explicit = ConsensusConfig::new(2.0 / 3.0, Duration::from_secs(60), 7, false, true);
535 assert_eq!(explicit.max_round_limit(100), 7);
536 }
537
538 #[tokio::test]
539 async fn add_vote_rejects_non_active_and_reports_reached_when_finalized() {
540 let signer = PrivateKeySigner::random();
541 let request = CreateProposalRequest::new(
542 "Test".into(),
543 "".into(),
544 signer.address().as_slice().to_vec(),
545 3,
546 60,
547 true,
548 )
549 .unwrap();
550 let proposal = request.into_proposal().unwrap();
551
552 let mut failed_session =
554 ConsensusSession::new(proposal.clone(), ConsensusConfig::gossipsub());
555 failed_session.state = ConsensusState::Failed;
556 let vote = build_vote(&failed_session.proposal, true, signer.clone())
557 .await
558 .unwrap();
559 let err = failed_session.add_vote(vote).unwrap_err();
560 assert!(matches!(err, ConsensusError::SessionNotActive));
561
562 let mut finalized_session = ConsensusSession::new(proposal, ConsensusConfig::gossipsub());
564 finalized_session.state = ConsensusState::ConsensusReached(true);
565 let vote = build_vote(&finalized_session.proposal, true, signer)
566 .await
567 .unwrap();
568 let transition = finalized_session.add_vote(vote).unwrap();
569 assert!(matches!(
570 transition,
571 crate::types::SessionTransition::ConsensusReached(true)
572 ));
573 }
574
575 #[tokio::test]
576 async fn initialize_with_votes_non_active_duplicate_and_zero_votes_paths() {
577 let signer = PrivateKeySigner::random();
578 let request = CreateProposalRequest::new(
579 "Test".into(),
580 "".into(),
581 signer.address().as_slice().to_vec(),
582 4,
583 60,
584 true,
585 )
586 .unwrap();
587 let proposal = request.into_proposal().unwrap();
588
589 let mut inactive = ConsensusSession::new(proposal.clone(), ConsensusConfig::gossipsub());
591 inactive.state = ConsensusState::Failed;
592 let err = inactive
593 .initialize_with_votes(vec![], proposal.expiration_timestamp, proposal.timestamp)
594 .unwrap_err();
595 assert!(matches!(err, ConsensusError::SessionNotActive));
596
597 let mut dup_session = ConsensusSession::new(proposal.clone(), ConsensusConfig::gossipsub());
599 let vote1 = build_vote(&dup_session.proposal, true, signer.clone())
600 .await
601 .unwrap();
602 let vote2 = build_vote(&dup_session.proposal, false, signer)
603 .await
604 .unwrap();
605 let err = dup_session
606 .initialize_with_votes(
607 vec![vote1, vote2],
608 proposal.expiration_timestamp,
609 proposal.timestamp,
610 )
611 .unwrap_err();
612 assert!(matches!(err, ConsensusError::DuplicateVote));
613
614 let mut zero_votes = ConsensusSession::new(proposal, ConsensusConfig::gossipsub());
616 zero_votes.check_round_limit(0).unwrap();
617 }
618
619 #[test]
620 fn p2p_round_limit_should_reject_effectively_huge_vote_count() {
621 if usize::BITS <= 32 {
622 return;
623 }
624
625 let signer = PrivateKeySigner::random();
626 let request = CreateProposalRequest::new(
627 "TruncationTest".into(),
628 vec![],
629 signer.address().as_slice().to_vec(),
630 1,
631 60,
632 true,
633 )
634 .unwrap();
635
636 let proposal = request.into_proposal().unwrap();
637 let mut session = ConsensusSession::new(proposal, ConsensusConfig::p2p());
638
639 let wrapped_vote_count = (u32::MAX as usize) + 1;
640
641 let result = session.check_round_limit(wrapped_vote_count);
643 assert!(
644 result.is_err(),
645 "effectively huge vote_count should not pass round-limit checks"
646 );
647 }
648
649 #[test]
650 fn p2p_update_round_should_advance_for_max_u32_vote_count() {
651 let signer = PrivateKeySigner::random();
652 let request = CreateProposalRequest::new(
653 "RoundUpdateMax".into(),
654 vec![],
655 signer.address().as_slice().to_vec(),
656 u32::MAX,
657 60,
658 true,
659 )
660 .unwrap();
661
662 let proposal = request.into_proposal().unwrap();
663 let mut session = ConsensusSession::new(proposal, ConsensusConfig::p2p());
664 let starting_round = session.proposal.round;
665
666 session.update_round(u32::MAX as usize);
668
669 assert!(
670 session.proposal.round > starting_round,
671 "round should advance when max u32 vote_count is applied"
672 );
673 assert_eq!(session.proposal.round, u32::MAX);
674 }
675}