Skip to main content

hashgraph_like_consensus/
session.rs

1//! Consensus session and configuration types.
2//!
3//! A [`ConsensusSession`] tracks the lifecycle of a single proposal — from creation
4//! through vote collection to a final [`ConsensusState`]. Each session carries its
5//! own [`ConsensusConfig`] that governs thresholds, timeouts, and round limits.
6
7use std::{collections::HashMap, time::Duration};
8
9use crate::{
10    error::ConsensusError,
11    protos::consensus::v1::{Proposal, Vote},
12    scope_config::{NetworkType, ScopeConfig},
13    types::SessionTransition,
14    utils::{
15        calculate_consensus_result, calculate_max_rounds, current_timestamp, validate_proposal,
16        validate_proposal_timestamp, validate_vote, validate_vote_chain,
17    },
18};
19
20/// Per-session configuration controlling how consensus is reached.
21///
22/// Use [`ConsensusConfig::gossipsub()`] or [`ConsensusConfig::p2p()`] for sensible
23/// defaults, then refine with builder methods like [`with_timeout`](Self::with_timeout)
24/// and [`with_threshold`](Self::with_threshold).
25#[derive(Debug, Clone)]
26pub struct ConsensusConfig {
27    /// What fraction of expected voters must vote before consensus can be reached (default: 2/3).
28    consensus_threshold: f64,
29    /// How long to wait before timing out if consensus isn't reached.
30    consensus_timeout: Duration,
31    /// Maximum number of voting rounds (vote increments) before giving up.
32    ///
33    /// Creation starts at round 1, so this caps the number of votes that can be processed.
34    /// Default (gossipsub) is 2 rounds; for P2P flows derive ceil(2n/3) via `ConsensusConfig::p2p()`.
35    max_rounds: u32,
36    /// Enable automatic two-round limit to mirror gossipsub behavior.
37    ///
38    /// When true, max_rounds limits the round number (round 1 = owner vote, round 2 = all other votes).
39    /// When false, max_rounds limits the vote count (each vote increments round).
40    use_gossipsub_rounds: bool,
41    /// Whether to apply liveness criteria for silent peers (count silent as YES/NO depending on this flag).
42    liveness_criteria: bool,
43}
44
45impl From<NetworkType> for ConsensusConfig {
46    fn from(network_type: NetworkType) -> Self {
47        ConsensusConfig::from(ScopeConfig::from(network_type))
48    }
49}
50
51impl From<ScopeConfig> for ConsensusConfig {
52    fn from(config: ScopeConfig) -> Self {
53        let (max_rounds, use_gossipsub_rounds) = match config.network_type {
54            NetworkType::Gossipsub => (config.max_rounds_override.unwrap_or(2), true),
55            // 0 triggers dynamic calculation for P2P networks
56            NetworkType::P2P => (config.max_rounds_override.unwrap_or(0), false),
57        };
58
59        ConsensusConfig::new(
60            config.default_consensus_threshold,
61            config.default_timeout,
62            max_rounds,
63            use_gossipsub_rounds,
64            config.default_liveness_criteria_yes,
65        )
66    }
67}
68
69impl ConsensusConfig {
70    /// Default configuration for P2P transport: derive round cap as ceil(2n/3).
71    /// Max rounds is 0, so the round cap is calculated dynamically based on the expected voters count.
72    pub fn p2p() -> Self {
73        ConsensusConfig::from(NetworkType::P2P)
74    }
75
76    /// Default configuration for Gossipsub transport: fixed 2-round flow.
77    pub fn gossipsub() -> Self {
78        ConsensusConfig::from(NetworkType::Gossipsub)
79    }
80
81    /// Set consensus timeout (validated) and return the updated config.
82    pub fn with_timeout(mut self, consensus_timeout: Duration) -> Result<Self, ConsensusError> {
83        crate::utils::validate_timeout(consensus_timeout)?;
84        self.consensus_timeout = consensus_timeout;
85        Ok(self)
86    }
87
88    /// Set consensus threshold (validated) and return the updated config.
89    pub fn with_threshold(mut self, consensus_threshold: f64) -> Result<Self, ConsensusError> {
90        crate::utils::validate_threshold(consensus_threshold)?;
91        self.consensus_threshold = consensus_threshold;
92        Ok(self)
93    }
94
95    /// Set liveness criteria and return the updated config.
96    pub fn with_liveness_criteria(mut self, liveness_criteria: bool) -> Self {
97        self.liveness_criteria = liveness_criteria;
98        self
99    }
100
101    /// Create a new ConsensusConfig with the given values.
102    /// This is used internally for scope configuration conversion.
103    pub(crate) fn new(
104        consensus_threshold: f64,
105        consensus_timeout: Duration,
106        max_rounds: u32,
107        use_gossipsub_rounds: bool,
108        liveness_criteria: bool,
109    ) -> Self {
110        Self {
111            consensus_threshold,
112            consensus_timeout,
113            max_rounds,
114            use_gossipsub_rounds,
115            liveness_criteria,
116        }
117    }
118
119    fn max_round_limit(&self, expected_voters_count: u32) -> u32 {
120        if self.use_gossipsub_rounds {
121            self.max_rounds
122        } else if self.max_rounds == 0 {
123            calculate_max_rounds(expected_voters_count, self.consensus_threshold)
124        } else {
125            self.max_rounds
126        }
127    }
128
129    /// Maximum time to wait for consensus before timing out.
130    pub fn consensus_timeout(&self) -> Duration {
131        self.consensus_timeout
132    }
133
134    /// Fraction of expected voters required before consensus can be determined.
135    pub fn consensus_threshold(&self) -> f64 {
136        self.consensus_threshold
137    }
138
139    /// Whether silent peers are counted as YES (`true`) or NO (`false`).
140    pub fn liveness_criteria(&self) -> bool {
141        self.liveness_criteria
142    }
143
144    /// Maximum number of voting rounds allowed.
145    pub fn max_rounds(&self) -> u32 {
146        self.max_rounds
147    }
148
149    /// Whether Gossipsub-style fixed 2-round semantics are in effect.
150    pub fn use_gossipsub_rounds(&self) -> bool {
151        self.use_gossipsub_rounds
152    }
153}
154
155#[derive(Debug, Clone)]
156pub enum ConsensusState {
157    /// Votes still accepted.
158    Active,
159    /// Voting closed with a boolean result.
160    ConsensusReached(bool),
161    /// Consensus could not be determined (typically on timeout with insufficient votes).
162    Failed,
163}
164
165#[derive(Debug, Clone)]
166pub struct ConsensusSession {
167    /// Current snapshot of the proposal including aggregated votes.
168    pub proposal: Proposal,
169    /// Session state tracking whether voting is still open.
170    pub state: ConsensusState,
171    /// Map of vote owner -> vote to enforce single vote per participant.
172    pub votes: HashMap<Vec<u8>, Vote>, // vote_owner -> Vote
173    /// Seconds since Unix epoch when the session was created.
174    pub created_at: u64,
175    /// Per-session runtime configuration.
176    pub config: ConsensusConfig,
177}
178
179impl ConsensusSession {
180    /// Create a new session from a validated proposal (no votes).
181    /// Used when creating proposals locally where we know the proposal is clean.
182    fn new(proposal: Proposal, config: ConsensusConfig) -> Self {
183        let now = current_timestamp().unwrap_or(0);
184        Self {
185            proposal,
186            state: ConsensusState::Active,
187            votes: HashMap::new(),
188            created_at: now,
189            config,
190        }
191    }
192
193    /// Create a session from a proposal, validating the proposal and all votes.
194    /// This validates the proposal structure, vote chain, and individual votes before creating the session.
195    /// The session is created with votes already processed and rounds correctly set.
196    pub fn from_proposal(
197        proposal: Proposal,
198        config: ConsensusConfig,
199    ) -> Result<(Self, SessionTransition), ConsensusError> {
200        validate_proposal(&proposal)?;
201
202        // Create clean proposal for session (votes will be added via initialize_with_votes)
203        let existing_votes = proposal.votes.clone();
204        let mut clean_proposal = proposal.clone();
205        clean_proposal.votes.clear();
206        // Always start with round 1 for new proposals as we at least have the proposal owner's vote.
207        clean_proposal.round = 1;
208
209        let mut session = Self::new(clean_proposal, config);
210        let transition = session.initialize_with_votes(
211            existing_votes,
212            proposal.expiration_timestamp,
213            proposal.timestamp,
214        )?;
215
216        Ok((session, transition))
217    }
218
219    /// Add a vote to the session.
220    pub(crate) fn add_vote(&mut self, vote: Vote) -> Result<SessionTransition, ConsensusError> {
221        match self.state {
222            ConsensusState::Active => {
223                validate_proposal_timestamp(self.proposal.expiration_timestamp)?;
224
225                // Check if adding this vote would exceed round limits
226                self.check_round_limit(1)?;
227
228                if self.votes.contains_key(&vote.vote_owner) {
229                    return Err(ConsensusError::DuplicateVote);
230                }
231                self.votes.insert(vote.vote_owner.clone(), vote.clone());
232                self.proposal.votes.push(vote.clone());
233
234                self.update_round(1);
235                Ok(self.check_consensus())
236            }
237            ConsensusState::ConsensusReached(res) => Ok(SessionTransition::ConsensusReached(res)),
238            _ => Err(ConsensusError::SessionNotActive),
239        }
240    }
241
242    /// Initialize session with multiple votes, validating all before adding any.
243    /// Validates duplicates, vote chain, and individual votes, then adds all atomically.
244    pub(crate) fn initialize_with_votes(
245        &mut self,
246        votes: Vec<Vote>,
247        expiration_timestamp: u64,
248        creation_time: u64,
249    ) -> Result<SessionTransition, ConsensusError> {
250        if !matches!(self.state, ConsensusState::Active) {
251            return Err(ConsensusError::SessionNotActive);
252        }
253
254        validate_proposal_timestamp(expiration_timestamp)?;
255
256        if votes.is_empty() {
257            return Ok(SessionTransition::StillActive);
258        }
259
260        let mut seen_owners = std::collections::HashSet::new();
261        for vote in &votes {
262            if !seen_owners.insert(&vote.vote_owner) {
263                return Err(ConsensusError::DuplicateVote);
264            }
265        }
266
267        validate_vote_chain(&votes)?;
268        for vote in &votes {
269            validate_vote(vote, expiration_timestamp, creation_time)?;
270        }
271
272        self.check_round_limit(votes.len())?;
273        self.update_round(votes.len());
274
275        for vote in votes {
276            self.votes.insert(vote.vote_owner.clone(), vote.clone());
277            self.proposal.votes.push(vote);
278        }
279
280        Ok(self.check_consensus())
281    }
282
283    /// Check if adding votes would exceed round limits.
284    ///
285    /// Unifies logic for both single-vote and batch processing:
286    /// - For a single vote, pass `vote_count: 1`.
287    /// - For P2P: Calculates `(current_round - 1) + vote_count`.
288    /// - For Gossipsub: Moves to Round 2 if `vote_count > 0`.
289    fn check_round_limit(&mut self, vote_count: usize) -> Result<(), ConsensusError> {
290        // Determine the value to compare against the limit based on configuration
291        let projected_value = if self.config.use_gossipsub_rounds {
292            // Gossipsub Logic:
293            // RFC Section 2.5.3: Round 1 = proposal, Round 2 = all parallel votes.
294            // If we are already at Round 2, we stay there.
295            // If we are at Round 1 and adding ANY votes (> 0), we move to Round 2.
296            if self.proposal.round == 2 || (self.proposal.round == 1 && vote_count > 0) {
297                2
298            } else {
299                self.proposal.round // Stays at 1 if vote_count is 0, or handles edge cases
300            }
301        } else {
302            // P2P Logic:
303            // RFC Section 2.5.3: Round increments per vote.
304            // Current existing votes = round - 1.
305            // Projected total = Existing votes + New votes.
306            let current_votes = self.proposal.round.saturating_sub(1);
307            current_votes.saturating_add(vote_count as u32)
308        };
309
310        if projected_value
311            > self
312                .config
313                .max_round_limit(self.proposal.expected_voters_count)
314        {
315            self.state = ConsensusState::Failed;
316            return Err(ConsensusError::MaxRoundsExceeded);
317        }
318
319        Ok(())
320    }
321
322    /// Update round after adding votes.
323    ///
324    /// Unifies logic for round updates:
325    /// - Gossipsub: Moves from Round 1 -> 2 if adding votes. Stays at 2 otherwise.
326    /// - P2P: Adds the number of votes to the current round.
327    fn update_round(&mut self, vote_count: usize) {
328        if self.config.use_gossipsub_rounds {
329            // RFC Section 2.5.3: Gossipsub
330            // Round 1 = proposal creation.
331            // Round 2 = all subsequent votes.
332            // If we are at Round 1 and add ANY votes (>0), we promote to Round 2.
333            if self.proposal.round == 1 && vote_count > 0 {
334                self.proposal.round = 2;
335            }
336        } else {
337            // RFC Section 2.5.3: P2P
338            // Round increments for every vote added.
339            self.proposal.round = self.proposal.round.saturating_add(vote_count as u32);
340        }
341    }
342
343    /// RFC Section 4 (Liveness): Check if consensus reached
344    /// - n > 2: need >n/2 YES votes among at least 2n/3 distinct peers
345    /// - n ≤ 2: require unanimous YES votes
346    /// - Equality: use liveness_criteria_yes
347    fn check_consensus(&mut self) -> SessionTransition {
348        let expected_voters = self.proposal.expected_voters_count;
349        let threshold = self.config.consensus_threshold;
350        let liveness = self.proposal.liveness_criteria_yes;
351
352        match calculate_consensus_result(&self.votes, expected_voters, threshold, liveness) {
353            Some(result) => {
354                self.state = ConsensusState::ConsensusReached(result);
355                SessionTransition::ConsensusReached(result)
356            }
357            None => {
358                self.state = ConsensusState::Active;
359                SessionTransition::StillActive
360            }
361        }
362    }
363
364    /// Check if this proposal is still accepting votes.
365    pub fn is_active(&self) -> bool {
366        matches!(self.state, ConsensusState::Active)
367    }
368
369    /// Get the consensus result if one has been reached.
370    ///
371    /// Returns `Ok(true)` for YES, `Ok(false)` for NO, or `Err(ConsensusError::ConsensusNotReached)` if consensus
372    /// hasn't been reached yet.
373    pub fn get_consensus_result(&self) -> Result<bool, ConsensusError> {
374        if let ConsensusState::ConsensusReached(result) = self.state {
375            Ok(result)
376        } else {
377            Err(ConsensusError::ConsensusNotReached)
378        }
379    }
380}
381
382#[cfg(test)]
383mod tests {
384    use alloy::signers::local::PrivateKeySigner;
385
386    use crate::{
387        error::ConsensusError,
388        session::{ConsensusConfig, ConsensusSession},
389        types::CreateProposalRequest,
390        utils::build_vote,
391    };
392
393    #[tokio::test]
394    async fn enforce_max_rounds_gossipsub() {
395        // Gossipsub: max_rounds = 2 means round 1 (proposal) and round 2 (all votes)
396        // Should allow multiple votes in round 2, but not exceed round 2
397        let signer1 = PrivateKeySigner::random();
398        let signer2 = PrivateKeySigner::random();
399        let signer3 = PrivateKeySigner::random();
400        let signer4 = PrivateKeySigner::random();
401
402        let request = CreateProposalRequest::new(
403            "Test".into(),
404            "".into(),
405            signer1.address().as_slice().to_vec(),
406            4, // 4 expected voters
407            60,
408            false,
409        )
410        .unwrap();
411
412        let proposal = request.into_proposal().unwrap();
413        let config = ConsensusConfig::gossipsub();
414        let mut session = ConsensusSession::new(proposal, config);
415
416        // Round 1 -> Round 2 (first vote)
417        let vote1 = build_vote(&session.proposal, true, signer1).await.unwrap();
418        session.add_vote(vote1).unwrap();
419        assert_eq!(session.proposal.round, 2);
420
421        // Stay at round 2 (second vote)
422        let vote2 = build_vote(&session.proposal, false, signer2).await.unwrap();
423        session.add_vote(vote2).unwrap();
424        assert_eq!(session.proposal.round, 2);
425
426        // Stay at round 2 (third vote)
427        let vote3 = build_vote(&session.proposal, true, signer3).await.unwrap();
428        session.add_vote(vote3).unwrap();
429        assert_eq!(session.proposal.round, 2);
430
431        // Stay at round 2 (fourth vote) - should succeed
432        let vote4 = build_vote(&session.proposal, true, signer4).await.unwrap();
433        session.add_vote(vote4).unwrap();
434        assert_eq!(session.proposal.round, 2);
435        assert_eq!(session.votes.len(), 4);
436    }
437
438    #[tokio::test]
439    async fn enforce_max_rounds_p2p() {
440        // P2P defaults: max_rounds = 0 triggers dynamic calculation based on expected voters.
441        // For threshold=2/3 and expected_voters=5, max_round_limit = ceil(2n/3) = 4 votes.
442        // Round 1 = 0 votes, Round 2 = 1 vote, ... Round 5 = 4 votes.
443        let signer1 = PrivateKeySigner::random();
444        let signer2 = PrivateKeySigner::random();
445        let signer3 = PrivateKeySigner::random();
446        let signer4 = PrivateKeySigner::random();
447        let signer5 = PrivateKeySigner::random();
448
449        let request = CreateProposalRequest::new(
450            "Test".into(),
451            "".into(),
452            signer1.address().as_slice().to_vec(),
453            5,
454            60,
455            false,
456        )
457        .unwrap();
458
459        let proposal = request.into_proposal().unwrap();
460        let config = ConsensusConfig::p2p();
461        let mut session = ConsensusSession::new(proposal, config);
462
463        // Round 1 -> Round 2 (first vote, 1 vote total)
464        let vote1 = build_vote(&session.proposal, true, signer1).await.unwrap();
465        session.add_vote(vote1).unwrap();
466        assert_eq!(session.proposal.round, 2);
467        assert_eq!(session.votes.len(), 1);
468
469        // Round 2 -> Round 3 (second vote, 2 votes total) - should succeed
470        let vote2 = build_vote(&session.proposal, false, signer2).await.unwrap();
471        session.add_vote(vote2).unwrap();
472        assert_eq!(session.proposal.round, 3);
473        assert_eq!(session.votes.len(), 2);
474
475        // Round 3 -> Round 4 (third vote, 3 votes total) - should succeed
476        let vote3 = build_vote(&session.proposal, true, signer3).await.unwrap();
477        session.add_vote(vote3).unwrap();
478        assert_eq!(session.proposal.round, 4);
479        assert_eq!(session.votes.len(), 3);
480
481        // Round 4 -> Round 5 (fourth vote, 4 votes total) - should succeed (dynamic limit = 4)
482        let vote4 = build_vote(&session.proposal, true, signer4).await.unwrap();
483        session.add_vote(vote4).unwrap();
484        assert_eq!(session.proposal.round, 5);
485        assert_eq!(session.votes.len(), 4);
486
487        // Fifth vote would exceed dynamic max_round_limit (=4 votes)
488        let vote5 = build_vote(&session.proposal, true, signer5).await.unwrap();
489        let err = session.add_vote(vote5).unwrap_err();
490        assert!(matches!(err, ConsensusError::MaxRoundsExceeded));
491    }
492}