hashgraph-like-consensus 0.2.0

A lightweight Rust library for making binary decisions in networks using hashgraph-style consensus
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
//! Consensus session and configuration types.
//!
//! A [`ConsensusSession`] tracks the lifecycle of a single proposal — from creation
//! through vote collection to a final [`ConsensusState`]. Each session carries its
//! own [`ConsensusConfig`] that governs thresholds, timeouts, and round limits.

use std::{collections::HashMap, time::Duration};

use crate::{
    error::ConsensusError,
    protos::consensus::v1::{Proposal, Vote},
    scope_config::{NetworkType, ScopeConfig},
    types::SessionTransition,
    utils::{
        calculate_consensus_result, calculate_max_rounds, current_timestamp, validate_proposal,
        validate_proposal_timestamp, validate_vote, validate_vote_chain,
    },
};

/// Per-session configuration controlling how consensus is reached.
///
/// Use [`ConsensusConfig::gossipsub()`] or [`ConsensusConfig::p2p()`] for sensible
/// defaults, then refine with builder methods like [`with_timeout`](Self::with_timeout)
/// and [`with_threshold`](Self::with_threshold).
#[derive(Debug, Clone)]
pub struct ConsensusConfig {
    /// What fraction of expected voters must vote before consensus can be reached (default: 2/3).
    consensus_threshold: f64,
    /// How long to wait before timing out if consensus isn't reached.
    consensus_timeout: Duration,
    /// Maximum number of voting rounds (vote increments) before giving up.
    ///
    /// Creation starts at round 1, so this caps the number of votes that can be processed.
    /// Default (gossipsub) is 2 rounds; for P2P flows derive ceil(2n/3) via `ConsensusConfig::p2p()`.
    max_rounds: u32,
    /// Enable automatic two-round limit to mirror gossipsub behavior.
    ///
    /// When true, max_rounds limits the round number (round 1 = owner vote, round 2 = all other votes).
    /// When false, max_rounds limits the vote count (each vote increments round).
    use_gossipsub_rounds: bool,
    /// Whether to apply liveness criteria for silent peers (count silent as YES/NO depending on this flag).
    liveness_criteria: bool,
}

impl From<NetworkType> for ConsensusConfig {
    fn from(network_type: NetworkType) -> Self {
        ConsensusConfig::from(ScopeConfig::from(network_type))
    }
}

impl From<ScopeConfig> for ConsensusConfig {
    fn from(config: ScopeConfig) -> Self {
        let (max_rounds, use_gossipsub_rounds) = match config.network_type {
            NetworkType::Gossipsub => (config.max_rounds_override.unwrap_or(2), true),
            // 0 triggers dynamic calculation for P2P networks
            NetworkType::P2P => (config.max_rounds_override.unwrap_or(0), false),
        };

        ConsensusConfig::new(
            config.default_consensus_threshold,
            config.default_timeout,
            max_rounds,
            use_gossipsub_rounds,
            config.default_liveness_criteria_yes,
        )
    }
}

impl ConsensusConfig {
    /// Default configuration for P2P transport: derive round cap as ceil(2n/3).
    /// Max rounds is 0, so the round cap is calculated dynamically based on the expected voters count.
    pub fn p2p() -> Self {
        ConsensusConfig::from(NetworkType::P2P)
    }

    /// Default configuration for Gossipsub transport: fixed 2-round flow.
    pub fn gossipsub() -> Self {
        ConsensusConfig::from(NetworkType::Gossipsub)
    }

    /// Set consensus timeout (validated) and return the updated config.
    pub fn with_timeout(mut self, consensus_timeout: Duration) -> Result<Self, ConsensusError> {
        crate::utils::validate_timeout(consensus_timeout)?;
        self.consensus_timeout = consensus_timeout;
        Ok(self)
    }

    /// Set consensus threshold (validated) and return the updated config.
    pub fn with_threshold(mut self, consensus_threshold: f64) -> Result<Self, ConsensusError> {
        crate::utils::validate_threshold(consensus_threshold)?;
        self.consensus_threshold = consensus_threshold;
        Ok(self)
    }

    /// Set liveness criteria and return the updated config.
    pub fn with_liveness_criteria(mut self, liveness_criteria: bool) -> Self {
        self.liveness_criteria = liveness_criteria;
        self
    }

    /// Create a new ConsensusConfig with the given values.
    /// This is used internally for scope configuration conversion.
    pub(crate) fn new(
        consensus_threshold: f64,
        consensus_timeout: Duration,
        max_rounds: u32,
        use_gossipsub_rounds: bool,
        liveness_criteria: bool,
    ) -> Self {
        Self {
            consensus_threshold,
            consensus_timeout,
            max_rounds,
            use_gossipsub_rounds,
            liveness_criteria,
        }
    }

    fn max_round_limit(&self, expected_voters_count: u32) -> u32 {
        if self.use_gossipsub_rounds {
            self.max_rounds
        } else if self.max_rounds == 0 {
            calculate_max_rounds(expected_voters_count, self.consensus_threshold)
        } else {
            self.max_rounds
        }
    }

    /// Maximum time to wait for consensus before timing out.
    pub fn consensus_timeout(&self) -> Duration {
        self.consensus_timeout
    }

    /// Fraction of expected voters required before consensus can be determined.
    pub fn consensus_threshold(&self) -> f64 {
        self.consensus_threshold
    }

    /// Whether silent peers are counted as YES (`true`) or NO (`false`).
    pub fn liveness_criteria(&self) -> bool {
        self.liveness_criteria
    }

    /// Maximum number of voting rounds allowed.
    pub fn max_rounds(&self) -> u32 {
        self.max_rounds
    }

    /// Whether Gossipsub-style fixed 2-round semantics are in effect.
    pub fn use_gossipsub_rounds(&self) -> bool {
        self.use_gossipsub_rounds
    }
}

#[derive(Debug, Clone)]
pub enum ConsensusState {
    /// Votes still accepted.
    Active,
    /// Voting closed with a boolean result.
    ConsensusReached(bool),
    /// Consensus could not be determined (typically on timeout with insufficient votes).
    Failed,
}

#[derive(Debug, Clone)]
pub struct ConsensusSession {
    /// Current snapshot of the proposal including aggregated votes.
    pub proposal: Proposal,
    /// Session state tracking whether voting is still open.
    pub state: ConsensusState,
    /// Map of vote owner -> vote to enforce single vote per participant.
    pub votes: HashMap<Vec<u8>, Vote>, // vote_owner -> Vote
    /// Seconds since Unix epoch when the session was created.
    pub created_at: u64,
    /// Per-session runtime configuration.
    pub config: ConsensusConfig,
}

impl ConsensusSession {
    /// Create a new session from a validated proposal (no votes).
    /// Used when creating proposals locally where we know the proposal is clean.
    fn new(proposal: Proposal, config: ConsensusConfig) -> Self {
        let now = current_timestamp().unwrap_or(0);
        Self {
            proposal,
            state: ConsensusState::Active,
            votes: HashMap::new(),
            created_at: now,
            config,
        }
    }

    /// Create a session from a proposal, validating the proposal and all votes.
    /// This validates the proposal structure, vote chain, and individual votes before creating the session.
    /// The session is created with votes already processed and rounds correctly set.
    pub fn from_proposal(
        proposal: Proposal,
        config: ConsensusConfig,
    ) -> Result<(Self, SessionTransition), ConsensusError> {
        validate_proposal(&proposal)?;

        // Create clean proposal for session (votes will be added via initialize_with_votes)
        let existing_votes = proposal.votes.clone();
        let mut clean_proposal = proposal.clone();
        clean_proposal.votes.clear();
        // Always start with round 1 for new proposals as we at least have the proposal owner's vote.
        clean_proposal.round = 1;

        let mut session = Self::new(clean_proposal, config);
        let transition = session.initialize_with_votes(
            existing_votes,
            proposal.expiration_timestamp,
            proposal.timestamp,
        )?;

        Ok((session, transition))
    }

    /// Add a vote to the session.
    pub(crate) fn add_vote(&mut self, vote: Vote) -> Result<SessionTransition, ConsensusError> {
        match self.state {
            ConsensusState::Active => {
                validate_proposal_timestamp(self.proposal.expiration_timestamp)?;

                // Check if adding this vote would exceed round limits
                self.check_round_limit(1)?;

                if self.votes.contains_key(&vote.vote_owner) {
                    return Err(ConsensusError::DuplicateVote);
                }
                self.votes.insert(vote.vote_owner.clone(), vote.clone());
                self.proposal.votes.push(vote.clone());

                self.update_round(1);
                Ok(self.check_consensus())
            }
            ConsensusState::ConsensusReached(res) => Ok(SessionTransition::ConsensusReached(res)),
            _ => Err(ConsensusError::SessionNotActive),
        }
    }

    /// Initialize session with multiple votes, validating all before adding any.
    /// Validates duplicates, vote chain, and individual votes, then adds all atomically.
    pub(crate) fn initialize_with_votes(
        &mut self,
        votes: Vec<Vote>,
        expiration_timestamp: u64,
        creation_time: u64,
    ) -> Result<SessionTransition, ConsensusError> {
        if !matches!(self.state, ConsensusState::Active) {
            return Err(ConsensusError::SessionNotActive);
        }

        validate_proposal_timestamp(expiration_timestamp)?;

        if votes.is_empty() {
            return Ok(SessionTransition::StillActive);
        }

        let mut seen_owners = std::collections::HashSet::new();
        for vote in &votes {
            if !seen_owners.insert(&vote.vote_owner) {
                return Err(ConsensusError::DuplicateVote);
            }
        }

        // Each distinct voter can vote at most once, so the batch size
        // is bounded by expected_voters_count (u32). Reject early if violated.
        if votes.len() > self.proposal.expected_voters_count as usize {
            self.state = ConsensusState::Failed;
            return Err(ConsensusError::MaxRoundsExceeded);
        }

        validate_vote_chain(&votes)?;
        for vote in &votes {
            validate_vote(vote, expiration_timestamp, creation_time)?;
        }

        self.check_round_limit(votes.len())?;
        self.update_round(votes.len());

        for vote in votes {
            self.votes.insert(vote.vote_owner.clone(), vote.clone());
            self.proposal.votes.push(vote);
        }

        Ok(self.check_consensus())
    }

    /// Check if adding votes would exceed round limits.
    ///
    /// Unifies logic for both single-vote and batch processing:
    /// - For a single vote, pass `vote_count: 1`.
    /// - For P2P: Calculates `(current_round - 1) + vote_count`.
    /// - For Gossipsub: Moves to Round 2 if `vote_count > 0`.
    fn check_round_limit(&mut self, vote_count: usize) -> Result<(), ConsensusError> {
        // vote_count cannot exceed expected_voters_count (u32); reject if it does
        if vote_count > self.proposal.expected_voters_count as usize {
            self.state = ConsensusState::Failed;
            return Err(ConsensusError::MaxRoundsExceeded);
        }

        // Determine the value to compare against the limit based on configuration
        let projected_value = if self.config.use_gossipsub_rounds {
            // Gossipsub Logic:
            // RFC Section 2.5.3: Round 1 = proposal, Round 2 = all parallel votes.
            // If we are already at Round 2, we stay there.
            // If we are at Round 1 and adding ANY votes (> 0), we move to Round 2.
            if self.proposal.round == 2 || (self.proposal.round == 1 && vote_count > 0) {
                2
            } else {
                self.proposal.round // Stays at 1 if vote_count is 0, or handles edge cases
            }
        } else {
            // P2P Logic:
            // RFC Section 2.5.3: Round increments per vote.
            // Current existing votes = round - 1.
            // Projected total = Existing votes + New votes.
            // vote_count is bounded by expected_voters_count (u32), safe to cast
            let current_votes = self.proposal.round.saturating_sub(1);
            current_votes.saturating_add(vote_count as u32)
        };

        if projected_value
            > self
                .config
                .max_round_limit(self.proposal.expected_voters_count)
        {
            self.state = ConsensusState::Failed;
            return Err(ConsensusError::MaxRoundsExceeded);
        }

        Ok(())
    }

    /// Update round after adding votes.
    ///
    /// Unifies logic for round updates:
    /// - Gossipsub: Moves from Round 1 -> 2 if adding votes. Stays at 2 otherwise.
    /// - P2P: Adds the number of votes to the current round.
    fn update_round(&mut self, vote_count: usize) {
        if self.config.use_gossipsub_rounds {
            // RFC Section 2.5.3: Gossipsub
            // Round 1 = proposal creation.
            // Round 2 = all subsequent votes.
            // If we are at Round 1 and add ANY votes (>0), we promote to Round 2.
            if self.proposal.round == 1 && vote_count > 0 {
                self.proposal.round = 2;
            }
        } else {
            // RFC Section 2.5.3: P2P
            // Round increments for every vote added.
            // vote_count is bounded by expected_voters_count (u32), safe to cast
            self.proposal.round = self.proposal.round.saturating_add(vote_count as u32);
        }
    }

    /// RFC Section 4 (Liveness): Check if consensus reached
    /// - n > 2: need >n/2 YES votes among at least 2n/3 distinct peers
    /// - n ≤ 2: require unanimous YES votes
    /// - Equality: use liveness_criteria_yes
    fn check_consensus(&mut self) -> SessionTransition {
        let expected_voters = self.proposal.expected_voters_count;
        let threshold = self.config.consensus_threshold;
        let liveness = self.proposal.liveness_criteria_yes;

        match calculate_consensus_result(&self.votes, expected_voters, threshold, liveness, false) {
            Some(result) => {
                self.state = ConsensusState::ConsensusReached(result);
                SessionTransition::ConsensusReached(result)
            }
            None => {
                self.state = ConsensusState::Active;
                SessionTransition::StillActive
            }
        }
    }

    /// Check if this proposal is still accepting votes.
    pub fn is_active(&self) -> bool {
        matches!(self.state, ConsensusState::Active)
    }

    /// Get the consensus result if one has been reached.
    ///
    /// Returns `Ok(true)` for YES, `Ok(false)` for NO, or `Err(ConsensusError::ConsensusNotReached)` if consensus
    /// hasn't been reached yet.
    pub fn get_consensus_result(&self) -> Result<bool, ConsensusError> {
        if let ConsensusState::ConsensusReached(result) = self.state {
            Ok(result)
        } else {
            Err(ConsensusError::ConsensusNotReached)
        }
    }
}

#[cfg(test)]
mod tests {
    use std::time::Duration;

    use alloy::signers::local::PrivateKeySigner;

    use crate::{
        error::ConsensusError,
        session::{ConsensusConfig, ConsensusSession, ConsensusState},
        types::CreateProposalRequest,
        utils::build_vote,
    };

    #[tokio::test]
    async fn enforce_max_rounds_gossipsub() {
        // Gossipsub: max_rounds = 2 means round 1 (proposal) and round 2 (all votes)
        // Should allow multiple votes in round 2, but not exceed round 2
        let signer1 = PrivateKeySigner::random();
        let signer2 = PrivateKeySigner::random();
        let signer3 = PrivateKeySigner::random();
        let signer4 = PrivateKeySigner::random();

        let request = CreateProposalRequest::new(
            "Test".into(),
            "".into(),
            signer1.address().as_slice().to_vec(),
            4, // 4 expected voters
            60,
            false,
        )
        .unwrap();

        let proposal = request.into_proposal().unwrap();
        let config = ConsensusConfig::gossipsub();
        let mut session = ConsensusSession::new(proposal, config);

        // Round 1 -> Round 2 (first vote)
        let vote1 = build_vote(&session.proposal, true, signer1).await.unwrap();
        session.add_vote(vote1).unwrap();
        assert_eq!(session.proposal.round, 2);

        // Stay at round 2 (second vote)
        let vote2 = build_vote(&session.proposal, false, signer2).await.unwrap();
        session.add_vote(vote2).unwrap();
        assert_eq!(session.proposal.round, 2);

        // Stay at round 2 (third vote)
        let vote3 = build_vote(&session.proposal, true, signer3).await.unwrap();
        session.add_vote(vote3).unwrap();
        assert_eq!(session.proposal.round, 2);

        // Stay at round 2 (fourth vote) - should succeed
        let vote4 = build_vote(&session.proposal, true, signer4).await.unwrap();
        session.add_vote(vote4).unwrap();
        assert_eq!(session.proposal.round, 2);
        assert_eq!(session.votes.len(), 4);
    }

    #[tokio::test]
    async fn enforce_max_rounds_p2p() {
        // P2P defaults: max_rounds = 0 triggers dynamic calculation based on expected voters.
        // For threshold=2/3 and expected_voters=5, max_round_limit = ceil(2n/3) = 4 votes.
        // Round 1 = 0 votes, Round 2 = 1 vote, ... Round 5 = 4 votes.
        let signer1 = PrivateKeySigner::random();
        let signer2 = PrivateKeySigner::random();
        let signer3 = PrivateKeySigner::random();
        let signer4 = PrivateKeySigner::random();
        let signer5 = PrivateKeySigner::random();

        let request = CreateProposalRequest::new(
            "Test".into(),
            "".into(),
            signer1.address().as_slice().to_vec(),
            5,
            60,
            false,
        )
        .unwrap();

        let proposal = request.into_proposal().unwrap();
        let config = ConsensusConfig::p2p();
        let mut session = ConsensusSession::new(proposal, config);

        // Round 1 -> Round 2 (first vote, 1 vote total)
        let vote1 = build_vote(&session.proposal, true, signer1).await.unwrap();
        session.add_vote(vote1).unwrap();
        assert_eq!(session.proposal.round, 2);
        assert_eq!(session.votes.len(), 1);

        // Round 2 -> Round 3 (second vote, 2 votes total) - should succeed
        let vote2 = build_vote(&session.proposal, false, signer2).await.unwrap();
        session.add_vote(vote2).unwrap();
        assert_eq!(session.proposal.round, 3);
        assert_eq!(session.votes.len(), 2);

        // Round 3 -> Round 4 (third vote, 3 votes total) - should succeed
        let vote3 = build_vote(&session.proposal, true, signer3).await.unwrap();
        session.add_vote(vote3).unwrap();
        assert_eq!(session.proposal.round, 4);
        assert_eq!(session.votes.len(), 3);

        // Round 4 -> Round 5 (fourth vote, 4 votes total) - should succeed (dynamic limit = 4)
        let vote4 = build_vote(&session.proposal, true, signer4).await.unwrap();
        session.add_vote(vote4).unwrap();
        assert_eq!(session.proposal.round, 5);
        assert_eq!(session.votes.len(), 4);

        // Fifth vote would exceed dynamic max_round_limit (=4 votes)
        let vote5 = build_vote(&session.proposal, true, signer5).await.unwrap();
        let err = session.add_vote(vote5).unwrap_err();
        assert!(matches!(err, ConsensusError::MaxRoundsExceeded));
    }

    #[test]
    fn consensus_config_builder_and_getters_cover_edges() {
        let cfg = ConsensusConfig::gossipsub()
            .with_threshold(0.75)
            .unwrap()
            .with_timeout(Duration::from_secs(42))
            .unwrap()
            .with_liveness_criteria(false);

        assert_eq!(cfg.consensus_threshold(), 0.75);
        assert_eq!(cfg.consensus_timeout(), Duration::from_secs(42));
        assert!(!cfg.liveness_criteria());

        let err = ConsensusConfig::gossipsub()
            .with_threshold(1.1)
            .unwrap_err();
        assert!(matches!(err, ConsensusError::InvalidConsensusThreshold));

        let err = ConsensusConfig::gossipsub()
            .with_timeout(Duration::from_secs(0))
            .unwrap_err();
        assert!(matches!(err, ConsensusError::InvalidTimeout));

        // Covers max_round_limit branch when P2P-like mode uses explicit max_rounds (non-zero).
        let explicit = ConsensusConfig::new(2.0 / 3.0, Duration::from_secs(60), 7, false, true);
        assert_eq!(explicit.max_round_limit(100), 7);
    }

    #[tokio::test]
    async fn add_vote_rejects_non_active_and_reports_reached_when_finalized() {
        let signer = PrivateKeySigner::random();
        let request = CreateProposalRequest::new(
            "Test".into(),
            "".into(),
            signer.address().as_slice().to_vec(),
            3,
            60,
            true,
        )
        .unwrap();
        let proposal = request.into_proposal().unwrap();

        // Failed sessions reject new votes.
        let mut failed_session =
            ConsensusSession::new(proposal.clone(), ConsensusConfig::gossipsub());
        failed_session.state = ConsensusState::Failed;
        let vote = build_vote(&failed_session.proposal, true, signer.clone())
            .await
            .unwrap();
        let err = failed_session.add_vote(vote).unwrap_err();
        assert!(matches!(err, ConsensusError::SessionNotActive));

        // Finalized sessions return existing transition/result.
        let mut finalized_session = ConsensusSession::new(proposal, ConsensusConfig::gossipsub());
        finalized_session.state = ConsensusState::ConsensusReached(true);
        let vote = build_vote(&finalized_session.proposal, true, signer)
            .await
            .unwrap();
        let transition = finalized_session.add_vote(vote).unwrap();
        assert!(matches!(
            transition,
            crate::types::SessionTransition::ConsensusReached(true)
        ));
    }

    #[tokio::test]
    async fn initialize_with_votes_non_active_duplicate_and_zero_votes_paths() {
        let signer = PrivateKeySigner::random();
        let request = CreateProposalRequest::new(
            "Test".into(),
            "".into(),
            signer.address().as_slice().to_vec(),
            4,
            60,
            true,
        )
        .unwrap();
        let proposal = request.into_proposal().unwrap();

        // Non-active sessions reject initialization.
        let mut inactive = ConsensusSession::new(proposal.clone(), ConsensusConfig::gossipsub());
        inactive.state = ConsensusState::Failed;
        let err = inactive
            .initialize_with_votes(vec![], proposal.expiration_timestamp, proposal.timestamp)
            .unwrap_err();
        assert!(matches!(err, ConsensusError::SessionNotActive));

        // Duplicate owners are rejected before chain/signature checks.
        let mut dup_session = ConsensusSession::new(proposal.clone(), ConsensusConfig::gossipsub());
        let vote1 = build_vote(&dup_session.proposal, true, signer.clone())
            .await
            .unwrap();
        let vote2 = build_vote(&dup_session.proposal, false, signer)
            .await
            .unwrap();
        let err = dup_session
            .initialize_with_votes(
                vec![vote1, vote2],
                proposal.expiration_timestamp,
                proposal.timestamp,
            )
            .unwrap_err();
        assert!(matches!(err, ConsensusError::DuplicateVote));

        // Explicitly exercise gossipsub projected round branch where vote_count == 0.
        let mut zero_votes = ConsensusSession::new(proposal, ConsensusConfig::gossipsub());
        zero_votes.check_round_limit(0).unwrap();
    }

    #[test]
    fn p2p_round_limit_should_reject_effectively_huge_vote_count() {
        if usize::BITS <= 32 {
            return;
        }

        let signer = PrivateKeySigner::random();
        let request = CreateProposalRequest::new(
            "TruncationTest".into(),
            vec![],
            signer.address().as_slice().to_vec(),
            1,
            60,
            true,
        )
        .unwrap();

        let proposal = request.into_proposal().unwrap();
        let mut session = ConsensusSession::new(proposal, ConsensusConfig::p2p());

        let wrapped_vote_count = (u32::MAX as usize) + 1;

        // Desired behavior: an effectively huge batch must be rejected by round-limit checks.
        let result = session.check_round_limit(wrapped_vote_count);
        assert!(
            result.is_err(),
            "effectively huge vote_count should not pass round-limit checks"
        );
    }

    #[test]
    fn p2p_update_round_should_advance_for_max_u32_vote_count() {
        let signer = PrivateKeySigner::random();
        let request = CreateProposalRequest::new(
            "RoundUpdateMax".into(),
            vec![],
            signer.address().as_slice().to_vec(),
            u32::MAX,
            60,
            true,
        )
        .unwrap();

        let proposal = request.into_proposal().unwrap();
        let mut session = ConsensusSession::new(proposal, ConsensusConfig::p2p());
        let starting_round = session.proposal.round;

        // vote_count at the u32 boundary should still advance the round via saturating_add
        session.update_round(u32::MAX as usize);

        assert!(
            session.proposal.round > starting_round,
            "round should advance when max u32 vote_count is applied"
        );
        assert_eq!(session.proposal.round, u32::MAX);
    }
}