Skip to main content

hotmint_consensus/
engine.rs

1use ruc::*;
2
3use std::collections::{HashMap, HashSet};
4use std::sync::Arc;
5
6use crate::application::Application;
7use crate::commit::{CommitResult, try_commit};
8use crate::evidence_store::EvidenceStore;
9use crate::leader;
10use crate::liveness::{LivenessTracker, OfflineEvidence};
11use crate::network::NetworkSink;
12use crate::pacemaker::{Pacemaker, PacemakerConfig};
13use crate::state::{ConsensusState, ViewStep};
14use crate::store::BlockStore;
15use crate::view_protocol::{self, ViewEntryTrigger};
16use crate::vote_collector::VoteCollector;
17
18use hotmint_types::epoch::Epoch;
19use hotmint_types::vote::VoteType;
20use hotmint_types::*;
21use tokio::sync::mpsc;
22use tracing::{error, info, warn};
23
24/// Shared block store type used by the engine, RPC, and sync responder.
25pub type SharedBlockStore = Arc<parking_lot::RwLock<Box<dyn BlockStore>>>;
26
27/// Trait for persisting critical consensus state across restarts.
28pub trait StatePersistence: Send {
29    fn save_current_view(&mut self, view: ViewNumber);
30    fn save_locked_qc(&mut self, qc: &QuorumCertificate);
31    fn save_highest_qc(&mut self, qc: &QuorumCertificate);
32    fn save_last_committed_height(&mut self, height: Height);
33    fn save_current_epoch(&mut self, epoch: &Epoch);
34    fn save_last_app_hash(&mut self, hash: BlockHash);
35    fn save_pending_epoch(&mut self, epoch: Option<&Epoch>);
36    fn flush(&self);
37}
38
39/// Write-Ahead Log trait for commit crash recovery.
40pub trait Wal: Send {
41    /// Log intent to commit blocks up to `target_height`. Must fsync before returning.
42    fn log_commit_intent(&mut self, target_height: Height) -> std::io::Result<()>;
43    /// Log that commit succeeded. May truncate the WAL.
44    fn log_commit_done(&mut self, target_height: Height) -> std::io::Result<()>;
45}
46
47pub struct ConsensusEngine {
48    state: ConsensusState,
49    store: SharedBlockStore,
50    network: Box<dyn NetworkSink>,
51    app: Box<dyn Application>,
52    signer: Box<dyn Signer>,
53    verifier: Box<dyn Verifier>,
54    vote_collector: VoteCollector,
55    pacemaker: Pacemaker,
56    pacemaker_config: PacemakerConfig,
57    msg_rx: mpsc::Receiver<(Option<ValidatorId>, ConsensusMessage)>,
58    /// Collected unique status cert senders (for leader, per view)
59    status_senders: HashSet<ValidatorId>,
60    /// The QC formed in this view's first voting round (used to build DoubleCert)
61    current_view_qc: Option<QuorumCertificate>,
62    /// Pending epoch transition (set by try_commit, applied in advance_view_to)
63    pending_epoch: Option<Epoch>,
64    /// Optional state persistence (for crash recovery).
65    persistence: Option<Box<dyn StatePersistence>>,
66    /// Optional evidence store for persisting equivocation proofs.
67    evidence_store: Option<Box<dyn EvidenceStore>>,
68    /// Tracks per-validator liveness for offline slashing.
69    liveness_tracker: LivenessTracker,
70    /// Optional write-ahead log for commit crash recovery.
71    wal: Option<Box<dyn Wal>>,
72    /// B-3: Per-sender inbound message rate limiter.
73    msg_rate_limiter: HashMap<ValidatorId, (std::time::Instant, u32)>,
74}
75
76/// Configuration for ConsensusEngine.
77pub struct EngineConfig {
78    pub verifier: Box<dyn Verifier>,
79    pub pacemaker: Option<PacemakerConfig>,
80    pub persistence: Option<Box<dyn StatePersistence>>,
81    pub evidence_store: Option<Box<dyn EvidenceStore>>,
82    pub wal: Option<Box<dyn Wal>>,
83    /// Restored pending epoch transition (from crash recovery).
84    pub pending_epoch: Option<Epoch>,
85}
86
87impl EngineConfig {
88    /// Create an `EngineConfig` with the given verifier and defaults
89    /// (no custom pacemaker, no persistence, no evidence store).
90    pub fn new(verifier: Box<dyn Verifier>) -> Self {
91        Self {
92            verifier,
93            pacemaker: None,
94            persistence: None,
95            evidence_store: None,
96            wal: None,
97            pending_epoch: None,
98        }
99    }
100
101    /// Set a custom pacemaker configuration.
102    pub fn with_pacemaker(mut self, pacemaker: PacemakerConfig) -> Self {
103        self.pacemaker = Some(pacemaker);
104        self
105    }
106
107    /// Set a state persistence backend.
108    pub fn with_persistence(mut self, persistence: Box<dyn StatePersistence>) -> Self {
109        self.persistence = Some(persistence);
110        self
111    }
112}
113
114/// Builder for constructing a `ConsensusEngine` with a fluent API.
115///
116/// # Example
117/// ```rust,ignore
118/// let engine = ConsensusEngineBuilder::new()
119///     .state(state)
120///     .store(store)
121///     .network(network)
122///     .app(app)
123///     .signer(signer)
124///     .messages(msg_rx)
125///     .verifier(verifier)
126///     .build()
127///     .expect("all required fields must be set");
128/// ```
129pub struct ConsensusEngineBuilder {
130    state: Option<ConsensusState>,
131    store: Option<SharedBlockStore>,
132    network: Option<Box<dyn NetworkSink>>,
133    app: Option<Box<dyn Application>>,
134    signer: Option<Box<dyn Signer>>,
135    msg_rx: Option<mpsc::Receiver<(Option<ValidatorId>, ConsensusMessage)>>,
136    verifier: Option<Box<dyn Verifier>>,
137    pacemaker: Option<PacemakerConfig>,
138    persistence: Option<Box<dyn StatePersistence>>,
139    evidence_store: Option<Box<dyn EvidenceStore>>,
140    wal: Option<Box<dyn Wal>>,
141}
142
143impl ConsensusEngineBuilder {
144    pub fn new() -> Self {
145        Self {
146            state: None,
147            store: None,
148            network: None,
149            app: None,
150            signer: None,
151            msg_rx: None,
152            verifier: None,
153            pacemaker: None,
154            persistence: None,
155            evidence_store: None,
156            wal: None,
157        }
158    }
159
160    pub fn state(mut self, state: ConsensusState) -> Self {
161        self.state = Some(state);
162        self
163    }
164
165    pub fn store(mut self, store: SharedBlockStore) -> Self {
166        self.store = Some(store);
167        self
168    }
169
170    pub fn network(mut self, network: Box<dyn NetworkSink>) -> Self {
171        self.network = Some(network);
172        self
173    }
174
175    pub fn app(mut self, app: Box<dyn Application>) -> Self {
176        self.app = Some(app);
177        self
178    }
179
180    pub fn signer(mut self, signer: Box<dyn Signer>) -> Self {
181        self.signer = Some(signer);
182        self
183    }
184
185    pub fn messages(
186        mut self,
187        msg_rx: mpsc::Receiver<(Option<ValidatorId>, ConsensusMessage)>,
188    ) -> Self {
189        self.msg_rx = Some(msg_rx);
190        self
191    }
192
193    pub fn verifier(mut self, verifier: Box<dyn Verifier>) -> Self {
194        self.verifier = Some(verifier);
195        self
196    }
197
198    pub fn pacemaker(mut self, config: PacemakerConfig) -> Self {
199        self.pacemaker = Some(config);
200        self
201    }
202
203    pub fn persistence(mut self, persistence: Box<dyn StatePersistence>) -> Self {
204        self.persistence = Some(persistence);
205        self
206    }
207
208    pub fn evidence_store(mut self, store: Box<dyn EvidenceStore>) -> Self {
209        self.evidence_store = Some(store);
210        self
211    }
212
213    pub fn wal(mut self, wal: Box<dyn Wal>) -> Self {
214        self.wal = Some(wal);
215        self
216    }
217
218    pub fn build(self) -> ruc::Result<ConsensusEngine> {
219        let state = self.state.ok_or_else(|| ruc::eg!("state is required"))?;
220        let store = self.store.ok_or_else(|| ruc::eg!("store is required"))?;
221        let network = self
222            .network
223            .ok_or_else(|| ruc::eg!("network is required"))?;
224        let app = self.app.ok_or_else(|| ruc::eg!("app is required"))?;
225        let signer = self.signer.ok_or_else(|| ruc::eg!("signer is required"))?;
226        let msg_rx = self
227            .msg_rx
228            .ok_or_else(|| ruc::eg!("messages (msg_rx) is required"))?;
229        let verifier = self
230            .verifier
231            .ok_or_else(|| ruc::eg!("verifier is required"))?;
232
233        let config = EngineConfig {
234            verifier,
235            pacemaker: self.pacemaker,
236            persistence: self.persistence,
237            evidence_store: self.evidence_store,
238            wal: self.wal,
239            pending_epoch: None,
240        };
241
242        Ok(ConsensusEngine::new(
243            state, store, network, app, signer, msg_rx, config,
244        ))
245    }
246}
247
248impl Default for ConsensusEngineBuilder {
249    fn default() -> Self {
250        Self::new()
251    }
252}
253
254impl ConsensusEngine {
255    pub fn new(
256        state: ConsensusState,
257        store: SharedBlockStore,
258        network: Box<dyn NetworkSink>,
259        app: Box<dyn Application>,
260        signer: Box<dyn Signer>,
261        msg_rx: mpsc::Receiver<(Option<ValidatorId>, ConsensusMessage)>,
262        config: EngineConfig,
263    ) -> Self {
264        let pc = config.pacemaker.unwrap_or_default();
265        Self {
266            state,
267            store,
268            network,
269            app,
270            signer,
271            verifier: config.verifier,
272            vote_collector: VoteCollector::new(),
273            pacemaker: Pacemaker::with_config(pc.clone()),
274            pacemaker_config: pc,
275            msg_rx,
276            status_senders: HashSet::new(),
277            current_view_qc: None,
278            pending_epoch: config.pending_epoch,
279            persistence: config.persistence,
280            evidence_store: config.evidence_store,
281            liveness_tracker: LivenessTracker::new(),
282            wal: config.wal,
283            msg_rate_limiter: HashMap::new(),
284        }
285    }
286
287    /// Bootstrap and start the event loop.
288    /// If persisted state was restored (current_view > 1), skip genesis bootstrap.
289    pub async fn run(mut self) {
290        // A-2: Check application info against consensus state for divergence.
291        // On any mismatch (height or app_hash), halt — do not participate with
292        // diverged state.
293        let app_info = self.app.info();
294        if app_info.last_block_height.as_u64() > 0 {
295            if app_info.last_block_height != self.state.last_committed_height {
296                error!(
297                    app_height = app_info.last_block_height.as_u64(),
298                    consensus_height = self.state.last_committed_height.as_u64(),
299                    "FATAL: application height differs from consensus state — halting"
300                );
301                std::process::exit(1);
302            }
303            if app_info.last_block_app_hash != self.state.last_app_hash {
304                error!(
305                    app_hash = %app_info.last_block_app_hash,
306                    consensus_hash = %self.state.last_app_hash,
307                    "FATAL: application app_hash differs from consensus state — halting"
308                );
309                std::process::exit(1);
310            }
311        }
312
313        if self.state.current_view.as_u64() <= 1 {
314            self.enter_genesis_view().await;
315        } else {
316            info!(
317                validator = %self.state.validator_id,
318                view = %self.state.current_view,
319                height = %self.state.last_committed_height,
320                "resuming from persisted state"
321            );
322            self.pacemaker.reset_timer();
323        }
324
325        loop {
326            let deadline = self.pacemaker.sleep_until_deadline();
327            tokio::pin!(deadline);
328
329            tokio::select! {
330                Some((sender, msg)) = self.msg_rx.recv() => {
331                    if let Err(e) = self.handle_message(sender, msg).await {
332                        warn!(validator = %self.state.validator_id, error = %e, "error handling message");
333                    }
334                }
335                _ = &mut deadline => {
336                    self.handle_timeout().await;
337                }
338            }
339        }
340    }
341
342    async fn enter_genesis_view(&mut self) {
343        // Create a synthetic genesis QC so the first leader can propose
344        let genesis_qc = QuorumCertificate {
345            block_hash: BlockHash::GENESIS,
346            view: ViewNumber::GENESIS,
347            aggregate_signature: AggregateSignature::new(
348                self.state.validator_set.validator_count(),
349            ),
350            epoch: self.state.current_epoch.number,
351        };
352        self.state.highest_qc = Some(genesis_qc);
353
354        let view = ViewNumber(1);
355        view_protocol::enter_view(
356            &mut self.state,
357            view,
358            ViewEntryTrigger::Genesis,
359            self.network.as_ref(),
360            self.signer.as_ref(),
361        );
362        self.pacemaker.reset_timer();
363
364        // If leader of genesis view, propose immediately
365        if self.state.is_leader() {
366            self.state.step = ViewStep::WaitingForStatus;
367            // In genesis, skip status wait — propose directly
368            self.try_propose().await;
369        }
370    }
371
372    fn try_propose(
373        &mut self,
374    ) -> std::pin::Pin<Box<dyn std::future::Future<Output = ()> + Send + '_>> {
375        Box::pin(async move {
376            // Collect pending evidence to embed in the block (C-3).
377            let pending_evidence = self
378                .evidence_store
379                .as_ref()
380                .map(|s| s.get_pending())
381                .unwrap_or_default();
382
383            // Propose is synchronous; acquire, run, and release the lock before any await.
384            let proposed_block = {
385                let mut store = self.store.write();
386                view_protocol::propose(
387                    &mut self.state,
388                    store.as_mut(),
389                    self.network.as_ref(),
390                    self.app.as_ref(),
391                    self.signer.as_ref(),
392                    pending_evidence,
393                )
394            }; // lock released here
395
396            match proposed_block {
397                Ok(block) => {
398                    // Leader votes for its own block
399                    self.leader_self_vote(block.hash).await;
400                }
401                Err(e) => {
402                    warn!(
403                        validator = %self.state.validator_id,
404                        error = %e,
405                        "failed to propose"
406                    );
407                }
408            }
409        })
410    }
411
412    async fn leader_self_vote(&mut self, block_hash: BlockHash) {
413        let vote_bytes = Vote::signing_bytes(
414            &self.state.chain_id_hash,
415            self.state.current_epoch.number,
416            self.state.current_view,
417            &block_hash,
418            VoteType::Vote,
419        );
420        let signature = self.signer.sign(&vote_bytes);
421        let vote = Vote {
422            block_hash,
423            view: self.state.current_view,
424            validator: self.state.validator_id,
425            signature,
426            vote_type: VoteType::Vote,
427            extension: None,
428        };
429        match self.vote_collector.add_vote(
430            &self.state.validator_set,
431            vote,
432            self.state.current_epoch.number,
433        ) {
434            Ok(result) => {
435                self.handle_equivocation(&result);
436                if let Some(qc) = result.qc {
437                    self.on_qc_formed(qc).await;
438                }
439            }
440            Err(e) => warn!(error = %e, "failed to add self vote"),
441        }
442    }
443}
444
445/// Verify the per-sender individual signature on a consensus message before relaying.
446///
447/// Only checks signatures that can be attributed to a single known validator using the
448/// provided key map.  Aggregate certificates (TimeoutCert, justify/prepare QCs) are
449/// intentionally not fully re-verified here — the receiving engine always does the full
450/// check.  Messages whose signing bytes depend on receiver state (StatusCert needs
451/// `current_view`) are also allowed through; the engine will reject them if invalid.
452///
453/// `ordered_validators` is the validator list in round-robin order (same order used by
454/// `ValidatorSet::leader_for_view`).  Pass an empty slice to skip the leader-for-view
455/// check (e.g., in tests where the set is not available).
456///
457/// Returns `false` when:
458/// - The claimed sender is not in `validator_keys` (unknown/non-validator peer), OR
459/// - The individual signature is cryptographically invalid, OR
460/// - For Prepare: the sender is not the leader for the certificate's view.
461pub fn verify_relay_sender(
462    sender: ValidatorId,
463    msg: &ConsensusMessage,
464    validator_keys: &HashMap<ValidatorId, hotmint_types::crypto::PublicKey>,
465    ordered_validators: &[ValidatorId],
466    chain_id_hash: &[u8; 32],
467    epoch: hotmint_types::epoch::EpochNumber,
468) -> bool {
469    use hotmint_crypto::Ed25519Verifier;
470    use hotmint_types::Verifier;
471    use hotmint_types::vote::Vote;
472    let verifier = Ed25519Verifier;
473    match msg {
474        ConsensusMessage::Propose {
475            block,
476            justify,
477            signature,
478            ..
479        } => {
480            let Some(pk) = validator_keys.get(&block.proposer) else {
481                return false;
482            };
483            let bytes =
484                crate::view_protocol::proposal_signing_bytes(chain_id_hash, epoch, block, justify);
485            Verifier::verify(&verifier, pk, &bytes, signature)
486        }
487        ConsensusMessage::VoteMsg(vote) | ConsensusMessage::Vote2Msg(vote) => {
488            let Some(pk) = validator_keys.get(&vote.validator) else {
489                return false;
490            };
491            let bytes = Vote::signing_bytes(
492                chain_id_hash,
493                epoch,
494                vote.view,
495                &vote.block_hash,
496                vote.vote_type,
497            );
498            Verifier::verify(&verifier, pk, &bytes, &vote.signature)
499        }
500        ConsensusMessage::Prepare {
501            certificate,
502            signature,
503        } => {
504            // Prepare is broadcast by the current leader. Verify that the relay
505            // sender is the leader for this view, then check the signature.
506            if !ordered_validators.is_empty() {
507                let n = ordered_validators.len();
508                let expected_leader = ordered_validators[certificate.view.as_u64() as usize % n];
509                if sender != expected_leader {
510                    return false;
511                }
512            }
513            let Some(pk) = validator_keys.get(&sender) else {
514                return false;
515            };
516            let bytes =
517                crate::view_protocol::prepare_signing_bytes(chain_id_hash, epoch, certificate);
518            Verifier::verify(&verifier, pk, &bytes, signature)
519        }
520        ConsensusMessage::Wish {
521            target_view,
522            validator,
523            highest_qc,
524            signature,
525        } => {
526            let Some(pk) = validator_keys.get(validator) else {
527                return false;
528            };
529            let bytes = crate::pacemaker::wish_signing_bytes(
530                chain_id_hash,
531                epoch,
532                *target_view,
533                highest_qc.as_ref(),
534            );
535            Verifier::verify(&verifier, pk, &bytes, signature)
536        }
537        ConsensusMessage::TimeoutCert(tc) => {
538            // Full relay verification: verify each signer's signature + quorum check.
539            let target_view = ViewNumber(tc.view.as_u64() + 1);
540            let n = ordered_validators.len();
541            if n == 0 || tc.aggregate_signature.signers.len() != n {
542                return false;
543            }
544            let mut sig_idx = 0usize;
545            let mut verified_count = 0usize;
546            for (i, &signed) in tc.aggregate_signature.signers.iter().enumerate() {
547                if !signed {
548                    continue;
549                }
550                if i >= n {
551                    return false;
552                }
553                let vid = ordered_validators[i];
554                let Some(pk) = validator_keys.get(&vid) else {
555                    return false;
556                };
557                let hqc = tc.highest_qcs.get(i).and_then(|h| h.as_ref());
558                let bytes =
559                    crate::pacemaker::wish_signing_bytes(chain_id_hash, epoch, target_view, hqc);
560                if sig_idx >= tc.aggregate_signature.signatures.len() {
561                    return false;
562                }
563                if !Verifier::verify(
564                    &verifier,
565                    pk,
566                    &bytes,
567                    &tc.aggregate_signature.signatures[sig_idx],
568                ) {
569                    return false;
570                }
571                sig_idx += 1;
572                verified_count += 1;
573            }
574            if sig_idx != tc.aggregate_signature.signatures.len() {
575                return false;
576            }
577            // Quorum check: require > 2/3 of validators (by count, not power —
578            // full power-based check is done in engine::verify_message).
579            verified_count * 3 > n * 2
580        }
581        ConsensusMessage::StatusCert {
582            validator,
583            signature,
584            locked_qc,
585            ..
586        } => {
587            // StatusCert signing bytes require current_view which we don't have
588            // in the relay context. Verify the sender is a known validator and
589            // the signature is over *some* plausible view (the TC view is not
590            // available here). The engine does full verification with correct view.
591            // At minimum, reject unknown validators.
592            let Some(pk) = validator_keys.get(validator) else {
593                return false;
594            };
595            // We cannot construct exact signing bytes without knowing current_view,
596            // so we accept from known validators only. The engine's verify_message
597            // will do full cryptographic verification.
598            let _ = (pk, signature, locked_qc);
599            true
600        }
601        ConsensusMessage::Evidence(_) => {
602            // Evidence gossip: accept from any known validator.
603            // The engine verifies the proof internally.
604            validator_keys.contains_key(&sender)
605        }
606    }
607}
608
609impl ConsensusEngine {
610    /// Epoch numbers to try when verifying signatures. During epoch transitions,
611    /// some nodes may still be in the previous epoch. We try the current epoch
612    /// first, then fall back to epoch - 1 to tolerate the transition window.
613    fn verification_epochs(&self) -> [EpochNumber; 2] {
614        let cur = self.state.current_epoch.number;
615        let prev = if cur.as_u64() > 0 {
616            EpochNumber(cur.as_u64() - 1)
617        } else {
618            cur
619        };
620        [cur, prev]
621    }
622
623    /// Verify the cryptographic signature on an inbound consensus message.
624    /// Returns false (and logs a warning) if verification fails.
625    /// Messages from past views are skipped (they'll be dropped by handle_message anyway).
626    ///
627    /// Crypto-heavy paths (aggregate signature verification) are run via
628    /// `tokio::task::block_in_place` so the async event loop remains responsive
629    /// while Ed25519 batch verification runs on the current OS thread.
630    fn verify_message(&self, msg: &ConsensusMessage) -> bool {
631        // Skip verification for non-Propose past-view messages — these may have
632        // been signed by a previous epoch's validator set. They'll be dropped by
633        // view checks. Propose messages are always verified because they may still
634        // be stored (for chain continuity in fast-forward).
635        let msg_view = match msg {
636            ConsensusMessage::Propose { .. } => None, // always verify proposals
637            ConsensusMessage::VoteMsg(v) | ConsensusMessage::Vote2Msg(v) => Some(v.view),
638            ConsensusMessage::Prepare { certificate, .. } => Some(certificate.view),
639            ConsensusMessage::Wish { target_view, .. } => Some(*target_view),
640            ConsensusMessage::TimeoutCert(tc) => Some(ViewNumber(tc.view.as_u64() + 1)),
641            ConsensusMessage::StatusCert { .. } => None,
642            ConsensusMessage::Evidence(_) => None, // always accept evidence
643        };
644        if let Some(v) = msg_view
645            && v < self.state.current_view
646        {
647            return true; // will be dropped by handler
648        }
649
650        let vs = &self.state.validator_set;
651        match msg {
652            ConsensusMessage::Propose {
653                block,
654                justify,
655                signature,
656                ..
657            } => {
658                let proposer = vs.get(block.proposer);
659                let Some(vi) = proposer else {
660                    warn!(proposer = %block.proposer, "propose from unknown validator");
661                    return false;
662                };
663                let mut proposal_ok = false;
664                for epoch in self.verification_epochs() {
665                    let bytes = view_protocol::proposal_signing_bytes(
666                        &self.state.chain_id_hash,
667                        epoch,
668                        block,
669                        justify,
670                    );
671                    if self.verifier.verify(&vi.public_key, &bytes, signature) {
672                        proposal_ok = true;
673                        break;
674                    }
675                }
676                if !proposal_ok {
677                    warn!(proposer = %block.proposer, "invalid proposal signature");
678                    return false;
679                }
680                // Verify justify QC aggregate signature (skip genesis QC which has no signers)
681                if justify.aggregate_signature.count() > 0 {
682                    let qc_bytes = Vote::signing_bytes(
683                        &self.state.chain_id_hash,
684                        justify.epoch,
685                        justify.view,
686                        &justify.block_hash,
687                        VoteType::Vote,
688                    );
689                    if !self
690                        .verifier
691                        .verify_aggregate(vs, &qc_bytes, &justify.aggregate_signature)
692                    {
693                        warn!(proposer = %block.proposer, "invalid justify QC aggregate signature");
694                        return false;
695                    }
696                    if justify.epoch == self.state.current_epoch.number
697                        && !hotmint_crypto::has_quorum(vs, &justify.aggregate_signature)
698                    {
699                        warn!(proposer = %block.proposer, "justify QC below quorum threshold");
700                        return false;
701                    }
702                }
703                true
704            }
705            ConsensusMessage::VoteMsg(vote) | ConsensusMessage::Vote2Msg(vote) => {
706                let Some(vi) = vs.get(vote.validator) else {
707                    warn!(validator = %vote.validator, "vote from unknown validator");
708                    return false;
709                };
710                let mut ok = false;
711                for epoch in self.verification_epochs() {
712                    let bytes = Vote::signing_bytes(
713                        &self.state.chain_id_hash,
714                        epoch,
715                        vote.view,
716                        &vote.block_hash,
717                        vote.vote_type,
718                    );
719                    if self
720                        .verifier
721                        .verify(&vi.public_key, &bytes, &vote.signature)
722                    {
723                        ok = true;
724                        break;
725                    }
726                }
727                if !ok {
728                    warn!(validator = %vote.validator, "invalid vote signature");
729                    return false;
730                }
731                true
732            }
733            ConsensusMessage::Prepare {
734                certificate,
735                signature,
736            } => {
737                // Verify the leader's signature on the prepare message
738                let Some(leader) = vs.leader_for_view(certificate.view) else {
739                    return false;
740                };
741                let mut prepare_ok = false;
742                for epoch in self.verification_epochs() {
743                    let bytes = view_protocol::prepare_signing_bytes(
744                        &self.state.chain_id_hash,
745                        epoch,
746                        certificate,
747                    );
748                    if self.verifier.verify(&leader.public_key, &bytes, signature) {
749                        prepare_ok = true;
750                        break;
751                    }
752                }
753                if !prepare_ok {
754                    warn!(view = %certificate.view, "invalid prepare signature");
755                    return false;
756                }
757                // Also verify the QC's aggregate signature and quorum
758                let qc_bytes = Vote::signing_bytes(
759                    &self.state.chain_id_hash,
760                    certificate.epoch,
761                    certificate.view,
762                    &certificate.block_hash,
763                    VoteType::Vote,
764                );
765                if !self
766                    .verifier
767                    .verify_aggregate(vs, &qc_bytes, &certificate.aggregate_signature)
768                {
769                    warn!(view = %certificate.view, "invalid QC aggregate signature");
770                    return false;
771                }
772                if certificate.epoch == self.state.current_epoch.number
773                    && !hotmint_crypto::has_quorum(vs, &certificate.aggregate_signature)
774                {
775                    warn!(view = %certificate.view, "Prepare QC below quorum threshold");
776                    return false;
777                }
778                true
779            }
780            ConsensusMessage::Wish {
781                target_view,
782                validator,
783                highest_qc,
784                signature,
785            } => {
786                let Some(vi) = vs.get(*validator) else {
787                    warn!(validator = %validator, "wish from unknown validator");
788                    return false;
789                };
790                // Signing bytes bind both target_view and highest_qc to prevent replay.
791                let mut wish_ok = false;
792                for epoch in self.verification_epochs() {
793                    let bytes = crate::pacemaker::wish_signing_bytes(
794                        &self.state.chain_id_hash,
795                        epoch,
796                        *target_view,
797                        highest_qc.as_ref(),
798                    );
799                    if self.verifier.verify(&vi.public_key, &bytes, signature) {
800                        wish_ok = true;
801                        break;
802                    }
803                }
804                if !wish_ok {
805                    warn!(validator = %validator, "invalid wish signature");
806                    return false;
807                }
808                true
809            }
810            ConsensusMessage::TimeoutCert(tc) => {
811                // The TC's aggregate signature is a collection of individual Ed25519 signatures,
812                // each signed over wish_signing_bytes(target_view, signer_highest_qc).
813                // Because each validator may have a different highest_qc, we verify per-signer
814                // using tc.highest_qcs[i] (indexed by validator slot).
815                // This also enforces quorum: we sum voting power of verified signers.
816                let target_view = ViewNumber(tc.view.as_u64() + 1);
817                let n = vs.validator_count();
818                if tc.aggregate_signature.signers.len() != n {
819                    warn!(view = %tc.view, "TC signers bitfield length mismatch");
820                    return false;
821                }
822                let mut sig_idx = 0usize;
823                let mut power = 0u64;
824                for (i, &signed) in tc.aggregate_signature.signers.iter().enumerate() {
825                    if !signed {
826                        continue;
827                    }
828                    let Some(vi) = vs.validators().get(i) else {
829                        warn!(view = %tc.view, validator_idx = i, "TC signer index out of validator set");
830                        return false;
831                    };
832                    let hqc = tc.highest_qcs.get(i).and_then(|h| h.as_ref());
833                    if sig_idx >= tc.aggregate_signature.signatures.len() {
834                        warn!(view = %tc.view, "TC aggregate_signature has fewer sigs than signers");
835                        return false;
836                    }
837                    let mut tc_sig_ok = false;
838                    for epoch in self.verification_epochs() {
839                        let bytes = crate::pacemaker::wish_signing_bytes(
840                            &self.state.chain_id_hash,
841                            epoch,
842                            target_view,
843                            hqc,
844                        );
845                        if self.verifier.verify(
846                            &vi.public_key,
847                            &bytes,
848                            &tc.aggregate_signature.signatures[sig_idx],
849                        ) {
850                            tc_sig_ok = true;
851                            break;
852                        }
853                    }
854                    if !tc_sig_ok {
855                        warn!(view = %tc.view, validator = %vi.id, "TC signer signature invalid");
856                        return false;
857                    }
858                    power += vs.power_of(vi.id);
859                    sig_idx += 1;
860                }
861                if sig_idx != tc.aggregate_signature.signatures.len() {
862                    warn!(view = %tc.view, "TC has extra signatures beyond bitfield");
863                    return false;
864                }
865                if power < vs.quorum_threshold() {
866                    warn!(view = %tc.view, power, threshold = vs.quorum_threshold(), "TC insufficient quorum");
867                    return false;
868                }
869                true
870            }
871            ConsensusMessage::StatusCert {
872                locked_qc,
873                validator,
874                signature,
875            } => {
876                let Some(vi) = vs.get(*validator) else {
877                    warn!(validator = %validator, "status from unknown validator");
878                    return false;
879                };
880                let mut status_ok = false;
881                for epoch in self.verification_epochs() {
882                    let bytes = view_protocol::status_signing_bytes(
883                        &self.state.chain_id_hash,
884                        epoch,
885                        self.state.current_view,
886                        locked_qc,
887                    );
888                    if self.verifier.verify(&vi.public_key, &bytes, signature) {
889                        status_ok = true;
890                        break;
891                    }
892                }
893                if !status_ok {
894                    warn!(validator = %validator, "invalid status signature");
895                    return false;
896                }
897                true
898            }
899            ConsensusMessage::Evidence(_) => {
900                // Evidence gossip does not carry an outer signature;
901                // the proof itself contains the conflicting vote signatures
902                // which are verified by the application layer.
903                true
904            }
905        }
906    }
907
908    async fn handle_message(
909        &mut self,
910        sender: Option<ValidatorId>,
911        msg: ConsensusMessage,
912    ) -> Result<()> {
913        // B-3: Per-sender rate limit before CPU-intensive crypto verification.
914        // Allow up to 100 messages per second per sender.
915        const MAX_MSG_PER_SEC: u32 = 100;
916        if let Some(vid) = sender {
917            let now = std::time::Instant::now();
918            let entry = self.msg_rate_limiter.entry(vid).or_insert((now, 0));
919            if now.duration_since(entry.0).as_secs() >= 1 {
920                *entry = (now, 1);
921            } else {
922                entry.1 += 1;
923                if entry.1 > MAX_MSG_PER_SEC {
924                    return Ok(());
925                }
926            }
927        }
928
929        // Run signature verification in a blocking context so that the tokio
930        // event loop is not stalled by CPU-intensive Ed25519 batch operations.
931        // block_in_place yields the current thread to the scheduler while the
932        // blocking work runs, keeping timers and I/O tasks responsive.
933        let verified = tokio::task::block_in_place(|| self.verify_message(&msg));
934        if !verified {
935            return Ok(());
936        }
937
938        match msg {
939            ConsensusMessage::Propose {
940                block,
941                justify,
942                double_cert,
943                signature: _,
944            } => {
945                let block = *block;
946                let justify = *justify;
947                let double_cert = double_cert.map(|dc| *dc);
948
949                // If proposal is from a future view, advance to it first
950                if block.view > self.state.current_view {
951                    if let Some(ref dc) = double_cert {
952                        if !tokio::task::block_in_place(|| self.validate_double_cert(dc)) {
953                            return Ok(());
954                        }
955
956                        // Fast-forward via double cert
957                        self.apply_commit(dc, "fast-forward").await;
958                        self.state.highest_double_cert = Some(dc.clone());
959                        self.advance_view_to(block.view, ViewEntryTrigger::DoubleCert(dc.clone()))
960                            .await;
961                    } else {
962                        return Ok(());
963                    }
964                } else if block.view < self.state.current_view {
965                    // Still store blocks from past views if we haven't committed
966                    // that height yet. This handles the case where fast-forward
967                    // advanced our view but we missed storing the block from the
968                    // earlier proposal. Without this, chain commits that walk
969                    // the parent chain would fail with "block not found".
970                    if block.height > self.state.last_committed_height {
971                        // Verify block hash before storing past-view blocks
972                        let expected = hotmint_crypto::compute_block_hash(&block);
973                        if block.hash == expected {
974                            let mut store = self.store.write();
975                            store.put_block(block);
976                        }
977                    }
978                    return Ok(());
979                }
980
981                let mut store = self.store.write();
982
983                // R-25: verify any DoubleCert in the same-view proposal path.
984                // The future-view path already calls validate_double_cert; the same-view path
985                // passes the DC straight to on_proposal → try_commit without verification.
986                // A Byzantine leader could inject a forged DC to trigger incorrect commits.
987                if let Some(ref dc) = double_cert
988                    && !tokio::task::block_in_place(|| self.validate_double_cert(dc))
989                {
990                    return Ok(());
991                }
992
993                // R-28: persist justify QC as commit evidence for the block it certifies.
994                // When blocks are committed via the 2-chain rule (possibly multiple blocks at
995                // once), the innermost block gets its own commit QC, but ancestor blocks only
996                // get the chain-rule commit and have no stored QC.  Storing the justify QC here
997                // ensures that sync responders can later serve those ancestor blocks with proof.
998                if justify.aggregate_signature.count() > 0
999                    && let Some(justified_block) = store.get_block(&justify.block_hash)
1000                    && store.get_commit_qc(justified_block.height).is_none()
1001                {
1002                    store.put_commit_qc(justified_block.height, justify.clone());
1003                }
1004
1005                // WAL: log commit intent before fast-forward commit in on_proposal.
1006                if let Some(ref dc) = double_cert
1007                    && let Some(ref mut wal) = self.wal
1008                    && let Some(target_block) = store.get_block(&dc.inner_qc.block_hash)
1009                    && let Err(e) = wal.log_commit_intent(target_block.height)
1010                {
1011                    warn!(error = %e, "WAL: failed to log commit intent for fast-forward");
1012                }
1013
1014                let proposal_result = view_protocol::on_proposal(
1015                    &mut self.state,
1016                    view_protocol::ProposalData {
1017                        block,
1018                        justify,
1019                        double_cert,
1020                    },
1021                    store.as_mut(),
1022                    self.network.as_ref(),
1023                    self.app.as_ref(),
1024                    self.signer.as_ref(),
1025                )
1026                .c(d!())?;
1027                drop(store);
1028
1029                // Process fast-forward commit result (WAL, tx indexing,
1030                // evidence marking, liveness tracking, persist_state).
1031                if let Some(result) = proposal_result.commit_result {
1032                    self.process_commit_result(&result);
1033                }
1034                if let Some(epoch) = proposal_result.pending_epoch {
1035                    self.pending_epoch = Some(epoch);
1036                }
1037            }
1038
1039            ConsensusMessage::VoteMsg(vote) => {
1040                if vote.view != self.state.current_view {
1041                    return Ok(());
1042                }
1043                if !self.state.is_leader() {
1044                    return Ok(());
1045                }
1046                if vote.vote_type != VoteType::Vote {
1047                    return Ok(());
1048                }
1049
1050                let result = self
1051                    .vote_collector
1052                    .add_vote(
1053                        &self.state.validator_set,
1054                        vote,
1055                        self.state.current_epoch.number,
1056                    )
1057                    .c(d!())?;
1058                self.handle_equivocation(&result);
1059                if let Some(qc) = result.qc {
1060                    self.on_qc_formed(qc).await;
1061                }
1062            }
1063
1064            ConsensusMessage::Prepare {
1065                certificate,
1066                signature: _,
1067            } => {
1068                if certificate.view < self.state.current_view {
1069                    return Ok(());
1070                }
1071                if certificate.view == self.state.current_view {
1072                    // Validate the Prepare's block app_hash if we have the block in
1073                    // store. Prevents locking onto a block whose app_hash diverges from
1074                    // our local state. When the block is absent (node caught up via TC),
1075                    // we defer to the QC's 2f+1 signatures for safety.
1076                    let store = self.store.read();
1077                    let block_opt = store.get_block(&certificate.block_hash);
1078                    if self.app.tracks_app_hash()
1079                        && let Some(ref block) = block_opt
1080                        && block.app_hash != self.state.last_app_hash
1081                    {
1082                        warn!(
1083                            block_app_hash = %block.app_hash,
1084                            local_app_hash = %self.state.last_app_hash,
1085                            "prepare block app_hash mismatch, ignoring"
1086                        );
1087                        return Ok(());
1088                    }
1089
1090                    // Generate vote extension for Vote2 (ABCI++ Vote Extensions).
1091                    // Only if we have the block available and have voting power.
1092                    let vote_extension = block_opt.and_then(|block| {
1093                        let ctx = BlockContext {
1094                            height: block.height,
1095                            view: self.state.current_view,
1096                            proposer: block.proposer,
1097                            epoch: self.state.current_epoch.number,
1098                            epoch_start_view: self.state.current_epoch.start_view,
1099                            validator_set: &self.state.validator_set,
1100                            timestamp: block.timestamp,
1101                            vote_extensions: vec![],
1102                        };
1103                        self.app.extend_vote(&block, &ctx)
1104                    });
1105                    drop(store);
1106
1107                    view_protocol::on_prepare(
1108                        &mut self.state,
1109                        certificate,
1110                        self.network.as_ref(),
1111                        self.signer.as_ref(),
1112                        vote_extension,
1113                    );
1114                }
1115            }
1116
1117            ConsensusMessage::Vote2Msg(vote) => {
1118                if vote.view != self.state.current_view {
1119                    return Ok(());
1120                }
1121                if vote.vote_type != VoteType::Vote2 {
1122                    return Ok(());
1123                }
1124
1125                // Verify vote extension (ABCI++ Vote Extensions) if present.
1126                if let Some(ref ext) = vote.extension
1127                    && !self
1128                        .app
1129                        .verify_vote_extension(ext, &vote.block_hash, vote.validator)
1130                {
1131                    warn!(
1132                        validator = %vote.validator,
1133                        view = %vote.view,
1134                        "rejecting vote2: invalid vote extension"
1135                    );
1136                    return Ok(());
1137                }
1138
1139                let result = self
1140                    .vote_collector
1141                    .add_vote(
1142                        &self.state.validator_set,
1143                        vote,
1144                        self.state.current_epoch.number,
1145                    )
1146                    .c(d!())?;
1147                self.handle_equivocation(&result);
1148                if let Some(outer_qc) = result.qc {
1149                    self.on_double_cert_formed(outer_qc, result.extensions)
1150                        .await;
1151                }
1152            }
1153
1154            ConsensusMessage::Wish {
1155                target_view,
1156                validator,
1157                highest_qc,
1158                signature,
1159            } => {
1160                // Validate carried highest_qc (C4 mitigation).
1161                // Both signature authenticity and 2f+1 quorum weight must pass.
1162                if let Some(ref qc) = highest_qc
1163                    && qc.aggregate_signature.count() > 0
1164                {
1165                    let qc_bytes = Vote::signing_bytes(
1166                        &self.state.chain_id_hash,
1167                        qc.epoch,
1168                        qc.view,
1169                        &qc.block_hash,
1170                        VoteType::Vote,
1171                    );
1172                    if !tokio::task::block_in_place(|| {
1173                        self.verifier.verify_aggregate(
1174                            &self.state.validator_set,
1175                            &qc_bytes,
1176                            &qc.aggregate_signature,
1177                        )
1178                    }) {
1179                        warn!(validator = %validator, "wish carries invalid highest_qc signature");
1180                        return Ok(());
1181                    }
1182                    // Only enforce quorum against the current validator set if the
1183                    // QC was formed in the current epoch. A QC from a previous epoch
1184                    // may not meet the new set's quorum threshold (e.g., after a
1185                    // validator power change), but its signatures were already verified
1186                    // above, so it remains a valid proof of finality in its own epoch.
1187                    if qc.epoch == self.state.current_epoch.number
1188                        && !hotmint_crypto::has_quorum(
1189                            &self.state.validator_set,
1190                            &qc.aggregate_signature,
1191                        )
1192                    {
1193                        warn!(validator = %validator, "wish carries highest_qc without quorum");
1194                        return Ok(());
1195                    }
1196                }
1197
1198                if let Some(tc) = self.pacemaker.add_wish(
1199                    &self.state.validator_set,
1200                    target_view,
1201                    validator,
1202                    highest_qc,
1203                    signature,
1204                ) {
1205                    info!(
1206                        validator = %self.state.validator_id,
1207                        view = %tc.view,
1208                        "TC formed, advancing view"
1209                    );
1210                    self.network
1211                        .broadcast(ConsensusMessage::TimeoutCert(tc.clone()));
1212                    self.advance_view(ViewEntryTrigger::TimeoutCert(tc)).await;
1213                }
1214            }
1215
1216            ConsensusMessage::TimeoutCert(tc) => {
1217                if self.pacemaker.should_relay_tc(&tc) {
1218                    self.network
1219                        .broadcast(ConsensusMessage::TimeoutCert(tc.clone()));
1220                }
1221                let new_view = ViewNumber(tc.view.as_u64() + 1);
1222                if new_view > self.state.current_view {
1223                    self.advance_view(ViewEntryTrigger::TimeoutCert(tc)).await;
1224                }
1225            }
1226
1227            ConsensusMessage::StatusCert {
1228                locked_qc,
1229                validator,
1230                signature: _,
1231            } => {
1232                if self.state.is_leader() && self.state.step == ViewStep::WaitingForStatus {
1233                    if let Some(ref qc) = locked_qc {
1234                        self.state.update_highest_qc(qc);
1235                    }
1236                    self.status_senders.insert(validator);
1237                    let status_power: u64 = self
1238                        .status_senders
1239                        .iter()
1240                        .map(|v| self.state.validator_set.power_of(*v))
1241                        .sum();
1242                    // Leader's own power counts toward quorum
1243                    let total_power =
1244                        status_power + self.state.validator_set.power_of(self.state.validator_id);
1245                    if total_power >= self.state.validator_set.quorum_threshold() {
1246                        self.try_propose().await;
1247                    }
1248                }
1249            }
1250
1251            ConsensusMessage::Evidence(proof) => {
1252                // C-6: Cryptographically verify evidence before accepting.
1253                // Both signatures must be valid from the alleged validator for
1254                // different block hashes at the same (view, vote_type).
1255                let vs = &self.state.validator_set;
1256                let vi = match vs.get(proof.validator) {
1257                    Some(vi) => vi,
1258                    None => {
1259                        warn!(validator = %proof.validator, "evidence for unknown validator");
1260                        return Ok(());
1261                    }
1262                };
1263                if proof.block_hash_a == proof.block_hash_b {
1264                    warn!(validator = %proof.validator, "evidence has identical block hashes");
1265                    return Ok(());
1266                }
1267                // Use the epoch from the proof itself (not local epoch) so
1268                // cross-epoch evidence can be verified correctly.
1269                let bytes_a = Vote::signing_bytes(
1270                    &self.state.chain_id_hash,
1271                    proof.epoch,
1272                    proof.view,
1273                    &proof.block_hash_a,
1274                    proof.vote_type,
1275                );
1276                let bytes_b = Vote::signing_bytes(
1277                    &self.state.chain_id_hash,
1278                    proof.epoch,
1279                    proof.view,
1280                    &proof.block_hash_b,
1281                    proof.vote_type,
1282                );
1283                if !self
1284                    .verifier
1285                    .verify(&vi.public_key, &bytes_a, &proof.signature_a)
1286                    || !self
1287                        .verifier
1288                        .verify(&vi.public_key, &bytes_b, &proof.signature_b)
1289                {
1290                    warn!(validator = %proof.validator, "evidence has invalid signatures");
1291                    return Ok(());
1292                }
1293
1294                info!(
1295                    validator = %proof.validator,
1296                    view = %proof.view,
1297                    "received valid evidence gossip"
1298                );
1299                if let Err(e) = self.app.on_evidence(&proof) {
1300                    warn!(error = %e, "on_evidence callback failed for gossiped proof");
1301                }
1302                if let Some(ref mut store) = self.evidence_store {
1303                    store.put_evidence(proof);
1304                }
1305            }
1306        }
1307        Ok(())
1308    }
1309
1310    fn handle_equivocation(&mut self, result: &crate::vote_collector::VoteResult) {
1311        if let Some(ref proof) = result.equivocation {
1312            warn!(
1313                validator = %proof.validator,
1314                view = %proof.view,
1315                "equivocation detected!"
1316            );
1317            if let Err(e) = self.app.on_evidence(proof) {
1318                warn!(error = %e, "on_evidence callback failed");
1319            }
1320            self.network.broadcast_evidence(proof);
1321            if let Some(ref mut store) = self.evidence_store {
1322                store.put_evidence(proof.clone());
1323            }
1324        }
1325    }
1326
1327    async fn on_qc_formed(&mut self, qc: QuorumCertificate) {
1328        // Save the QC so we can reliably pair it when forming a DoubleCert
1329        self.current_view_qc = Some(qc.clone());
1330
1331        view_protocol::on_votes_collected(
1332            &mut self.state,
1333            qc.clone(),
1334            self.network.as_ref(),
1335            self.signer.as_ref(),
1336        );
1337
1338        // Leader also does vote2 for its own prepare (self-vote for step 5)
1339        // Generate vote extension (ABCI++ Vote Extensions) if the block is available.
1340        let vote_extension = {
1341            let store = self.store.read();
1342            store.get_block(&qc.block_hash).and_then(|block| {
1343                let ctx = BlockContext {
1344                    height: block.height,
1345                    view: self.state.current_view,
1346                    proposer: block.proposer,
1347                    epoch: self.state.current_epoch.number,
1348                    epoch_start_view: self.state.current_epoch.start_view,
1349                    validator_set: &self.state.validator_set,
1350                    timestamp: block.timestamp,
1351                    vote_extensions: vec![],
1352                };
1353                self.app.extend_vote(&block, &ctx)
1354            })
1355        };
1356        let vote_bytes = Vote::signing_bytes(
1357            &self.state.chain_id_hash,
1358            self.state.current_epoch.number,
1359            self.state.current_view,
1360            &qc.block_hash,
1361            VoteType::Vote2,
1362        );
1363        let signature = self.signer.sign(&vote_bytes);
1364        let vote = Vote {
1365            block_hash: qc.block_hash,
1366            view: self.state.current_view,
1367            validator: self.state.validator_id,
1368            signature,
1369            vote_type: VoteType::Vote2,
1370            extension: vote_extension,
1371        };
1372
1373        // Lock on this QC
1374        self.state.update_locked_qc(&qc);
1375
1376        let next_leader_id =
1377            leader::next_leader(&self.state.validator_set, self.state.current_view);
1378        if next_leader_id == self.state.validator_id {
1379            // We are the next leader, collect vote2 locally
1380            match self.vote_collector.add_vote(
1381                &self.state.validator_set,
1382                vote,
1383                self.state.current_epoch.number,
1384            ) {
1385                Ok(result) => {
1386                    self.handle_equivocation(&result);
1387                    if let Some(outer_qc) = result.qc {
1388                        self.on_double_cert_formed(outer_qc, result.extensions)
1389                            .await;
1390                    }
1391                }
1392                Err(e) => warn!(error = %e, "failed to add self vote2"),
1393            }
1394        } else {
1395            self.network
1396                .send_to(next_leader_id, ConsensusMessage::Vote2Msg(vote));
1397        }
1398    }
1399
1400    async fn on_double_cert_formed(
1401        &mut self,
1402        outer_qc: QuorumCertificate,
1403        extensions: Vec<(ValidatorId, Vec<u8>)>,
1404    ) {
1405        // Use the QC we explicitly saved from this view's first voting round
1406        let inner_qc = match self.current_view_qc.take() {
1407            Some(qc) if qc.block_hash == outer_qc.block_hash => qc,
1408            _ => {
1409                // Fallback to locked_qc or highest_qc
1410                match &self.state.locked_qc {
1411                    Some(qc) if qc.block_hash == outer_qc.block_hash => qc.clone(),
1412                    _ => match &self.state.highest_qc {
1413                        Some(qc) if qc.block_hash == outer_qc.block_hash => qc.clone(),
1414                        _ => {
1415                            warn!(
1416                                validator = %self.state.validator_id,
1417                                "double cert formed but can't find matching inner QC"
1418                            );
1419                            return;
1420                        }
1421                    },
1422                }
1423            }
1424        };
1425
1426        let dc = DoubleCertificate {
1427            inner_qc,
1428            outer_qc,
1429            vote_extensions: extensions,
1430        };
1431
1432        info!(
1433            validator = %self.state.validator_id,
1434            view = %self.state.current_view,
1435            hash = %dc.inner_qc.block_hash,
1436            "double certificate formed, committing"
1437        );
1438
1439        // Commit
1440        self.apply_commit(&dc, "double-cert").await;
1441
1442        self.state.highest_double_cert = Some(dc.clone());
1443
1444        // Advance to next view — as new leader, include DC in proposal
1445        self.advance_view(ViewEntryTrigger::DoubleCert(dc)).await;
1446    }
1447
1448    async fn handle_timeout(&mut self) {
1449        // Skip wish building/signing entirely when we have no voting power (fullnodes).
1450        // build_wish involves a cryptographic signing operation that serves no purpose
1451        // when the wish will never be broadcast or counted toward a TC.
1452        let has_power = self.state.validator_set.power_of(self.state.validator_id) > 0;
1453        if !has_power {
1454            self.pacemaker.on_timeout();
1455            return;
1456        }
1457
1458        info!(
1459            validator = %self.state.validator_id,
1460            view = %self.state.current_view,
1461            "view timeout, sending wish"
1462        );
1463
1464        let wish = self.pacemaker.build_wish(
1465            &self.state.chain_id_hash,
1466            self.state.current_epoch.number,
1467            self.state.current_view,
1468            self.state.validator_id,
1469            self.state.highest_qc.clone(),
1470            self.signer.as_ref(),
1471        );
1472
1473        self.network.broadcast(wish.clone());
1474
1475        // Also process our own wish
1476        if let ConsensusMessage::Wish {
1477            target_view,
1478            validator,
1479            highest_qc,
1480            signature,
1481        } = wish
1482            && let Some(tc) = self.pacemaker.add_wish(
1483                &self.state.validator_set,
1484                target_view,
1485                validator,
1486                highest_qc,
1487                signature,
1488            )
1489        {
1490            self.network
1491                .broadcast(ConsensusMessage::TimeoutCert(tc.clone()));
1492            self.advance_view(ViewEntryTrigger::TimeoutCert(tc)).await;
1493            return;
1494        }
1495
1496        // Exponential backoff on repeated timeouts
1497        self.pacemaker.on_timeout();
1498    }
1499
1500    /// Post-commit processing: store commit QCs, index txs, mark evidence,
1501    /// track liveness, persist state, and log WAL done.
1502    /// Shared by both `apply_commit` (normal DC path) and the on_proposal
1503    /// fast-forward path to ensure all commit side-effects happen consistently.
1504    fn process_commit_result(&mut self, result: &CommitResult) {
1505        if result.committed_blocks.is_empty() {
1506            return;
1507        }
1508        {
1509            let mut s = self.store.write();
1510            for (i, block) in result.committed_blocks.iter().enumerate() {
1511                if result.commit_qc.block_hash == block.hash {
1512                    s.put_commit_qc(block.height, result.commit_qc.clone());
1513                }
1514                let txs = crate::commit::decode_payload(&block.payload);
1515                for (tx_idx, tx) in txs.iter().enumerate() {
1516                    let tx_hash = *blake3::hash(tx).as_bytes();
1517                    s.put_tx_index(tx_hash, block.height, tx_idx as u32);
1518                }
1519                if let Some(resp) = result.block_responses.get(i) {
1520                    s.put_block_results(block.height, resp.clone());
1521                }
1522            }
1523            s.flush();
1524        }
1525        if let Some(ref mut ev_store) = self.evidence_store {
1526            for block in &result.committed_blocks {
1527                for proof in &block.evidence {
1528                    ev_store.mark_committed(proof.view, proof.validator);
1529                }
1530                for proof in ev_store.get_pending() {
1531                    if proof.view <= block.view {
1532                        ev_store.mark_committed(proof.view, proof.validator);
1533                    }
1534                }
1535            }
1536        }
1537        self.liveness_tracker.record_commit(
1538            &self.state.validator_set,
1539            &result.commit_qc.aggregate_signature.signers,
1540        );
1541        self.persist_state();
1542        if let Some(ref mut wal) = self.wal
1543            && let Err(e) = wal.log_commit_done(self.state.last_committed_height)
1544        {
1545            warn!(error = %e, "WAL: failed to log commit done");
1546        }
1547    }
1548
1549    /// Apply the result of a successful try_commit: update app_hash, pending epoch,
1550    /// store commit QCs, and flush. Called from both normal and fast-forward commit paths.
1551    async fn apply_commit(&mut self, dc: &DoubleCertificate, context: &str) {
1552        // WAL: log commit intent before executing blocks.
1553        if let Some(ref mut wal) = self.wal {
1554            let target_height = {
1555                let store = self.store.read();
1556                store.get_block(&dc.inner_qc.block_hash).map(|b| b.height)
1557            };
1558            if let Some(h) = target_height
1559                && let Err(e) = wal.log_commit_intent(h)
1560            {
1561                warn!(error = %e, "WAL: failed to log commit intent");
1562            }
1563        }
1564
1565        let store = self.store.read();
1566        match try_commit(
1567            dc,
1568            store.as_ref(),
1569            self.app.as_ref(),
1570            &mut self.state.last_committed_height,
1571            &self.state.current_epoch,
1572        ) {
1573            Ok(result) => {
1574                if !result.committed_blocks.is_empty() {
1575                    self.state.last_app_hash = result.last_app_hash;
1576                }
1577                if result.pending_epoch.is_some() {
1578                    self.pending_epoch = result.pending_epoch.clone();
1579                }
1580                drop(store);
1581                self.process_commit_result(&result);
1582            }
1583            Err(e) => {
1584                warn!(error = %e, "try_commit failed during {context}");
1585                drop(store);
1586            }
1587        }
1588    }
1589
1590    /// Cryptographically validate a DoubleCertificate:
1591    /// 1. inner and outer QC must reference the same block hash
1592    /// 2. inner QC aggregate signature (Vote1) must be valid and reach quorum
1593    /// 3. outer QC aggregate signature (Vote2) must be valid and reach quorum
1594    ///
1595    /// Note on quorum and epoch transitions: DCs are always formed in the same epoch as
1596    /// the block they commit (vote_collector enforces quorum at formation time).  When
1597    /// a DC is received by a node that has already transitioned to a new epoch, the
1598    /// validator set may differ.  We enforce quorum against the current validator set
1599    /// as the best available reference; a legitimate DC from a prior epoch should still
1600    /// satisfy quorum against the new set unless the set shrank significantly.
1601    fn validate_double_cert(&self, dc: &DoubleCertificate) -> bool {
1602        if dc.inner_qc.block_hash != dc.outer_qc.block_hash {
1603            warn!("double cert inner/outer block_hash mismatch");
1604            return false;
1605        }
1606        let vs = &self.state.validator_set;
1607        let inner_bytes = Vote::signing_bytes(
1608            &self.state.chain_id_hash,
1609            dc.inner_qc.epoch,
1610            dc.inner_qc.view,
1611            &dc.inner_qc.block_hash,
1612            VoteType::Vote,
1613        );
1614        if !self
1615            .verifier
1616            .verify_aggregate(vs, &inner_bytes, &dc.inner_qc.aggregate_signature)
1617        {
1618            warn!("double cert inner QC signature invalid");
1619            return false;
1620        }
1621        if !hotmint_crypto::has_quorum(vs, &dc.inner_qc.aggregate_signature) {
1622            warn!("double cert inner QC below quorum threshold");
1623            return false;
1624        }
1625        let outer_bytes = Vote::signing_bytes(
1626            &self.state.chain_id_hash,
1627            dc.outer_qc.epoch,
1628            dc.outer_qc.view,
1629            &dc.outer_qc.block_hash,
1630            VoteType::Vote2,
1631        );
1632        if !self
1633            .verifier
1634            .verify_aggregate(vs, &outer_bytes, &dc.outer_qc.aggregate_signature)
1635        {
1636            warn!("double cert outer QC signature invalid");
1637            return false;
1638        }
1639        if !hotmint_crypto::has_quorum(vs, &dc.outer_qc.aggregate_signature) {
1640            warn!("double cert outer QC below quorum threshold");
1641            return false;
1642        }
1643        true
1644    }
1645
1646    fn persist_state(&mut self) {
1647        if let Some(p) = self.persistence.as_mut() {
1648            p.save_current_view(self.state.current_view);
1649            if let Some(ref qc) = self.state.locked_qc {
1650                p.save_locked_qc(qc);
1651            }
1652            if let Some(ref qc) = self.state.highest_qc {
1653                p.save_highest_qc(qc);
1654            }
1655            p.save_last_committed_height(self.state.last_committed_height);
1656            p.save_current_epoch(&self.state.current_epoch);
1657            p.save_last_app_hash(self.state.last_app_hash);
1658            p.save_pending_epoch(self.pending_epoch.as_ref());
1659            p.flush();
1660        }
1661    }
1662
1663    async fn advance_view(&mut self, trigger: ViewEntryTrigger) {
1664        let new_view = match &trigger {
1665            ViewEntryTrigger::DoubleCert(_) => self.state.current_view.next(),
1666            ViewEntryTrigger::TimeoutCert(tc) => ViewNumber(tc.view.as_u64() + 1),
1667            ViewEntryTrigger::Genesis => ViewNumber(1),
1668        };
1669        self.advance_view_to(new_view, trigger).await;
1670    }
1671
1672    async fn advance_view_to(&mut self, new_view: ViewNumber, trigger: ViewEntryTrigger) {
1673        if new_view <= self.state.current_view {
1674            return;
1675        }
1676
1677        // Reset backoff on successful progress (DoubleCert path)
1678        let is_progress = matches!(&trigger, ViewEntryTrigger::DoubleCert(_));
1679
1680        // Capture vote extensions from DoubleCertificate for the next create_payload.
1681        if let ViewEntryTrigger::DoubleCert(ref dc) = trigger {
1682            self.state.pending_vote_extensions = dc.vote_extensions.clone();
1683        } else {
1684            self.state.pending_vote_extensions.clear();
1685        }
1686
1687        self.vote_collector.clear_view(self.state.current_view);
1688        self.vote_collector.prune_before(self.state.current_view);
1689        self.pacemaker.clear_view(self.state.current_view);
1690        self.pacemaker.prune_before(self.state.current_view);
1691        self.status_senders.clear();
1692        self.current_view_qc = None;
1693
1694        // Epoch transition: apply pending validator set change when we reach the
1695        // epoch's start_view. The start_view is set deterministically (commit_view + 2)
1696        // so all honest nodes apply the transition at the same view.
1697        if self
1698            .pending_epoch
1699            .as_ref()
1700            .is_some_and(|e| new_view >= e.start_view)
1701        {
1702            // SAFETY: we just verified `pending_epoch` is `Some` above.
1703            let Some(new_epoch) = self.pending_epoch.take() else {
1704                unreachable!("pending_epoch was Some in the condition check");
1705            };
1706            info!(
1707                validator = %self.state.validator_id,
1708                old_epoch = %self.state.current_epoch.number,
1709                new_epoch = %new_epoch.number,
1710                start_view = %new_epoch.start_view,
1711                validators = new_epoch.validator_set.validator_count(),
1712                "epoch transition"
1713            );
1714            // Report offline validators to the application before transitioning.
1715            let offline = self.liveness_tracker.offline_validators();
1716            if !offline.is_empty() {
1717                let evidence: Vec<OfflineEvidence> = offline
1718                    .iter()
1719                    .map(|&(validator, missed, total)| OfflineEvidence {
1720                        validator,
1721                        missed_commits: missed,
1722                        total_commits: total,
1723                        evidence_height: self.state.last_committed_height,
1724                    })
1725                    .collect();
1726                info!(
1727                    offline_count = evidence.len(),
1728                    epoch = %self.state.current_epoch.number,
1729                    "reporting offline validators"
1730                );
1731                if let Err(e) = self.app.on_offline_validators(&evidence) {
1732                    warn!(error = %e, "on_offline_validators callback failed");
1733                }
1734            }
1735            self.liveness_tracker.reset();
1736
1737            self.state.validator_set = new_epoch.validator_set.clone();
1738            self.state.current_epoch = new_epoch;
1739            // Notify network layer of the new validator set and epoch
1740            self.network
1741                .on_epoch_change(self.state.current_epoch.number, &self.state.validator_set);
1742            // Full clear: old votes/wishes are from the previous epoch's validator set
1743            self.vote_collector = VoteCollector::new();
1744            self.pacemaker = Pacemaker::with_config(self.pacemaker_config.clone());
1745        }
1746
1747        view_protocol::enter_view(
1748            &mut self.state,
1749            new_view,
1750            trigger,
1751            self.network.as_ref(),
1752            self.signer.as_ref(),
1753        );
1754
1755        if is_progress {
1756            self.pacemaker.reset_on_progress();
1757        } else {
1758            self.pacemaker.reset_timer();
1759        }
1760
1761        self.persist_state();
1762
1763        // If we're the leader, propose immediately.
1764        // Note: in a full implementation, the leader would collect StatusCerts
1765        // before proposing (status_senders quorum gate). Currently the immediate
1766        // propose path is required for liveness across epoch transitions where
1767        // cross-epoch verification complexity can stall status collection.
1768        if self.state.is_leader() && self.state.step == ViewStep::WaitingForStatus {
1769            self.try_propose().await;
1770        }
1771    }
1772}
1773
1774// ---------------------------------------------------------------------------
1775// Regression tests for sub-quorum certificate injection (R-29, R-32)
1776// ---------------------------------------------------------------------------
1777
1778#[cfg(test)]
1779mod tests {
1780    use super::*;
1781
1782    use std::sync::Arc;
1783
1784    use parking_lot::RwLock;
1785
1786    use hotmint_crypto::{Ed25519Signer, Ed25519Verifier};
1787    use hotmint_types::Signer as SignerTrait;
1788    use hotmint_types::certificate::QuorumCertificate;
1789    use hotmint_types::crypto::AggregateSignature;
1790    use hotmint_types::epoch::EpochNumber;
1791    use hotmint_types::validator::{ValidatorId, ValidatorInfo};
1792    use hotmint_types::vote::{Vote, VoteType};
1793    use tokio::sync::mpsc;
1794
1795    use crate::application::NoopApplication;
1796    use crate::network::NetworkSink;
1797    use crate::state::ConsensusState;
1798    use crate::store::MemoryBlockStore;
1799
1800    // Minimal no-op network for unit tests — messages are silently discarded.
1801    struct DevNullNetwork;
1802    impl NetworkSink for DevNullNetwork {
1803        fn broadcast(&self, _: ConsensusMessage) {}
1804        fn send_to(&self, _: ValidatorId, _: ConsensusMessage) {}
1805    }
1806
1807    /// Default chain_id_hash for tests — matches ConsensusState::new() which uses chain_id = "".
1808    fn test_chain_id_hash() -> [u8; 32] {
1809        *blake3::hash(b"").as_bytes()
1810    }
1811
1812    fn make_validator_set_4() -> (ValidatorSet, Vec<Ed25519Signer>) {
1813        let signers: Vec<Ed25519Signer> = (0..4)
1814            .map(|i| Ed25519Signer::generate(ValidatorId(i)))
1815            .collect();
1816        let infos: Vec<ValidatorInfo> = signers
1817            .iter()
1818            .map(|s| ValidatorInfo {
1819                id: s.validator_id(),
1820                public_key: s.public_key(),
1821                power: 1,
1822            })
1823            .collect();
1824        (ValidatorSet::new(infos), signers)
1825    }
1826
1827    fn make_test_engine(
1828        vid: ValidatorId,
1829        vs: ValidatorSet,
1830        signer: Ed25519Signer,
1831    ) -> (
1832        ConsensusEngine,
1833        mpsc::Sender<(Option<ValidatorId>, ConsensusMessage)>,
1834    ) {
1835        let (tx, rx) = mpsc::channel(64);
1836        let store = Arc::new(RwLock::new(
1837            Box::new(MemoryBlockStore::new()) as Box<dyn crate::store::BlockStore>
1838        ));
1839        let state = ConsensusState::new(vid, vs);
1840        let engine = ConsensusEngine::new(
1841            state,
1842            store,
1843            Box::new(DevNullNetwork),
1844            Box::new(NoopApplication),
1845            Box::new(signer),
1846            rx,
1847            EngineConfig {
1848                verifier: Box::new(Ed25519Verifier),
1849                pacemaker: None,
1850                persistence: None,
1851                evidence_store: None,
1852                wal: None,
1853                pending_epoch: None,
1854            },
1855        );
1856        (engine, tx)
1857    }
1858
1859    // R-29 regression: a Propose message whose justify QC is signed by fewer than
1860    // 2f+1 validators must be rejected by verify_message().
1861    #[test]
1862    fn r29_propose_sub_quorum_justify_rejected_by_verify_message() {
1863        let (vs, signers) = make_validator_set_4();
1864        // Use a fresh signer for the engine; verify_message only needs the engine's
1865        // validator set and verifier, not its own signing key.
1866        let engine_signer = Ed25519Signer::generate(ValidatorId(0));
1867        let (engine, _tx) = make_test_engine(ValidatorId(0), vs.clone(), engine_signer);
1868
1869        // Build a justify QC signed by exactly 1 of 4 validators — below 2f+1 = 3.
1870        let chain_id_hash = test_chain_id_hash();
1871        let hash = BlockHash::GENESIS;
1872        let qc_view = ViewNumber::GENESIS;
1873        let vote_bytes = Vote::signing_bytes(
1874            &chain_id_hash,
1875            EpochNumber(0),
1876            qc_view,
1877            &hash,
1878            VoteType::Vote,
1879        );
1880        let mut agg = AggregateSignature::new(4);
1881        agg.add(1, SignerTrait::sign(&signers[1], &vote_bytes))
1882            .unwrap();
1883        let sub_quorum_qc = QuorumCertificate {
1884            block_hash: hash,
1885            view: qc_view,
1886            aggregate_signature: agg,
1887            epoch: EpochNumber(0),
1888        };
1889
1890        // Construct a proposal from V1 carrying this sub-quorum justify.
1891        let mut block = Block::genesis();
1892        block.height = Height(1);
1893        block.view = ViewNumber(1);
1894        block.proposer = ValidatorId(1);
1895        block.hash = block.compute_hash();
1896        let proposal_bytes = crate::view_protocol::proposal_signing_bytes(
1897            &chain_id_hash,
1898            EpochNumber(0),
1899            &block,
1900            &sub_quorum_qc,
1901        );
1902        let signature = SignerTrait::sign(&signers[1], &proposal_bytes);
1903
1904        let msg = ConsensusMessage::Propose {
1905            block: Box::new(block),
1906            justify: Box::new(sub_quorum_qc),
1907            double_cert: None,
1908            signature,
1909        };
1910
1911        assert!(
1912            !engine.verify_message(&msg),
1913            "R-29 regression: Propose with sub-quorum justify QC must be rejected by verify_message"
1914        );
1915    }
1916
1917    // R-29 regression: a Propose message with a full quorum justify QC (3/4) must pass.
1918    #[test]
1919    fn r29_propose_full_quorum_justify_accepted_by_verify_message() {
1920        let (vs, signers) = make_validator_set_4();
1921        let engine_signer = Ed25519Signer::generate(ValidatorId(0));
1922        let (engine, _tx) = make_test_engine(ValidatorId(0), vs.clone(), engine_signer);
1923
1924        let chain_id_hash = test_chain_id_hash();
1925        let hash = BlockHash::GENESIS;
1926        let qc_view = ViewNumber::GENESIS;
1927        let vote_bytes = Vote::signing_bytes(
1928            &chain_id_hash,
1929            EpochNumber(0),
1930            qc_view,
1931            &hash,
1932            VoteType::Vote,
1933        );
1934        // 3 of 4 signers — meets 2f+1 threshold.
1935        let mut agg = AggregateSignature::new(4);
1936        for (i, signer) in signers.iter().take(3).enumerate() {
1937            agg.add(i, SignerTrait::sign(signer, &vote_bytes)).unwrap();
1938        }
1939        let full_quorum_qc = QuorumCertificate {
1940            block_hash: hash,
1941            view: qc_view,
1942            aggregate_signature: agg,
1943            epoch: EpochNumber(0),
1944        };
1945
1946        let mut block = Block::genesis();
1947        block.height = Height(1);
1948        block.view = ViewNumber(1);
1949        block.proposer = ValidatorId(1);
1950        block.hash = block.compute_hash();
1951        let proposal_bytes = crate::view_protocol::proposal_signing_bytes(
1952            &chain_id_hash,
1953            EpochNumber(0),
1954            &block,
1955            &full_quorum_qc,
1956        );
1957        let signature = SignerTrait::sign(&signers[1], &proposal_bytes);
1958
1959        let msg = ConsensusMessage::Propose {
1960            block: Box::new(block),
1961            justify: Box::new(full_quorum_qc),
1962            double_cert: None,
1963            signature,
1964        };
1965
1966        assert!(
1967            engine.verify_message(&msg),
1968            "R-29: Propose with full quorum justify QC must pass verify_message"
1969        );
1970    }
1971
1972    // R-32 regression: a Wish carrying a sub-quorum highest_qc must cause
1973    // verify_highest_qc_in_wish to treat the QC as invalid and return false,
1974    // which causes handle_message to discard the Wish without forwarding it
1975    // to the pacemaker.
1976    //
1977    // We verify the sub-component: has_quorum returns false for a 1-of-4 aggregate,
1978    // ensuring the guard in handle_message fires.
1979    #[test]
1980    fn r32_sub_quorum_highest_qc_fails_has_quorum() {
1981        let (vs, signers) = make_validator_set_4();
1982
1983        let chain_id_hash = test_chain_id_hash();
1984        let hash = BlockHash([1u8; 32]);
1985        let qc_view = ViewNumber(1);
1986        let vote_bytes = Vote::signing_bytes(
1987            &chain_id_hash,
1988            EpochNumber(0),
1989            qc_view,
1990            &hash,
1991            VoteType::Vote,
1992        );
1993
1994        // Build a QC with only 1 signer — sub-quorum.
1995        let mut agg = AggregateSignature::new(4);
1996        agg.add(0, SignerTrait::sign(&signers[0], &vote_bytes))
1997            .unwrap();
1998
1999        assert!(
2000            !hotmint_crypto::has_quorum(&vs, &agg),
2001            "R-32 regression: 1-of-4 signed QC must not satisfy has_quorum"
2002        );
2003
2004        // Build a QC with 3 signers — full quorum.
2005        let mut agg_full = AggregateSignature::new(4);
2006        for (i, signer) in signers.iter().take(3).enumerate() {
2007            agg_full
2008                .add(i, SignerTrait::sign(signer, &vote_bytes))
2009                .unwrap();
2010        }
2011        assert!(
2012            hotmint_crypto::has_quorum(&vs, &agg_full),
2013            "R-32: 3-of-4 signed QC must satisfy has_quorum"
2014        );
2015    }
2016}