Skip to main content

hotmint_consensus/
engine.rs

1use ruc::*;
2
3use std::collections::{HashMap, HashSet};
4use std::sync::Arc;
5
6use crate::application::Application;
7use crate::commit::{CommitResult, try_commit};
8use crate::evidence_store::EvidenceStore;
9use crate::leader;
10use crate::liveness::{LivenessTracker, OfflineEvidence};
11use crate::network::NetworkSink;
12use crate::pacemaker::{Pacemaker, PacemakerConfig};
13use crate::state::{ConsensusState, ViewStep};
14use crate::store::BlockStore;
15use crate::view_protocol::{self, ViewEntryTrigger};
16use crate::vote_collector::VoteCollector;
17
18use hotmint_types::epoch::Epoch;
19use hotmint_types::vote::VoteType;
20use hotmint_types::*;
21use tokio::sync::mpsc;
22use tracing::{info, warn};
23
24/// Shared block store type used by the engine, RPC, and sync responder.
25pub type SharedBlockStore = Arc<parking_lot::RwLock<Box<dyn BlockStore>>>;
26
27/// Trait for persisting critical consensus state across restarts.
28pub trait StatePersistence: Send {
29    fn save_current_view(&mut self, view: ViewNumber);
30    fn save_locked_qc(&mut self, qc: &QuorumCertificate);
31    fn save_highest_qc(&mut self, qc: &QuorumCertificate);
32    fn save_last_committed_height(&mut self, height: Height);
33    fn save_current_epoch(&mut self, epoch: &Epoch);
34    fn save_last_app_hash(&mut self, hash: BlockHash);
35    fn flush(&self);
36}
37
38/// Write-Ahead Log trait for commit crash recovery.
39pub trait Wal: Send {
40    /// Log intent to commit blocks up to `target_height`. Must fsync before returning.
41    fn log_commit_intent(&mut self, target_height: Height) -> std::io::Result<()>;
42    /// Log that commit succeeded. May truncate the WAL.
43    fn log_commit_done(&mut self, target_height: Height) -> std::io::Result<()>;
44}
45
46pub struct ConsensusEngine {
47    state: ConsensusState,
48    store: SharedBlockStore,
49    network: Box<dyn NetworkSink>,
50    app: Box<dyn Application>,
51    signer: Box<dyn Signer>,
52    verifier: Box<dyn Verifier>,
53    vote_collector: VoteCollector,
54    pacemaker: Pacemaker,
55    pacemaker_config: PacemakerConfig,
56    msg_rx: mpsc::Receiver<(Option<ValidatorId>, ConsensusMessage)>,
57    /// Collected unique status cert senders (for leader, per view)
58    status_senders: HashSet<ValidatorId>,
59    /// The QC formed in this view's first voting round (used to build DoubleCert)
60    current_view_qc: Option<QuorumCertificate>,
61    /// Pending epoch transition (set by try_commit, applied in advance_view_to)
62    pending_epoch: Option<Epoch>,
63    /// Optional state persistence (for crash recovery).
64    persistence: Option<Box<dyn StatePersistence>>,
65    /// Optional evidence store for persisting equivocation proofs.
66    evidence_store: Option<Box<dyn EvidenceStore>>,
67    /// Tracks per-validator liveness for offline slashing.
68    liveness_tracker: LivenessTracker,
69    /// Optional write-ahead log for commit crash recovery.
70    wal: Option<Box<dyn Wal>>,
71}
72
73/// Configuration for ConsensusEngine.
74pub struct EngineConfig {
75    pub verifier: Box<dyn Verifier>,
76    pub pacemaker: Option<PacemakerConfig>,
77    pub persistence: Option<Box<dyn StatePersistence>>,
78    pub evidence_store: Option<Box<dyn EvidenceStore>>,
79    pub wal: Option<Box<dyn Wal>>,
80}
81
82impl EngineConfig {
83    /// Create an `EngineConfig` with the given verifier and defaults
84    /// (no custom pacemaker, no persistence, no evidence store).
85    pub fn new(verifier: Box<dyn Verifier>) -> Self {
86        Self {
87            verifier,
88            pacemaker: None,
89            persistence: None,
90            evidence_store: None,
91            wal: None,
92        }
93    }
94
95    /// Set a custom pacemaker configuration.
96    pub fn with_pacemaker(mut self, pacemaker: PacemakerConfig) -> Self {
97        self.pacemaker = Some(pacemaker);
98        self
99    }
100
101    /// Set a state persistence backend.
102    pub fn with_persistence(mut self, persistence: Box<dyn StatePersistence>) -> Self {
103        self.persistence = Some(persistence);
104        self
105    }
106}
107
108/// Builder for constructing a `ConsensusEngine` with a fluent API.
109///
110/// # Example
111/// ```rust,ignore
112/// let engine = ConsensusEngineBuilder::new()
113///     .state(state)
114///     .store(store)
115///     .network(network)
116///     .app(app)
117///     .signer(signer)
118///     .messages(msg_rx)
119///     .verifier(verifier)
120///     .build()
121///     .expect("all required fields must be set");
122/// ```
123pub struct ConsensusEngineBuilder {
124    state: Option<ConsensusState>,
125    store: Option<SharedBlockStore>,
126    network: Option<Box<dyn NetworkSink>>,
127    app: Option<Box<dyn Application>>,
128    signer: Option<Box<dyn Signer>>,
129    msg_rx: Option<mpsc::Receiver<(Option<ValidatorId>, ConsensusMessage)>>,
130    verifier: Option<Box<dyn Verifier>>,
131    pacemaker: Option<PacemakerConfig>,
132    persistence: Option<Box<dyn StatePersistence>>,
133    evidence_store: Option<Box<dyn EvidenceStore>>,
134    wal: Option<Box<dyn Wal>>,
135}
136
137impl ConsensusEngineBuilder {
138    pub fn new() -> Self {
139        Self {
140            state: None,
141            store: None,
142            network: None,
143            app: None,
144            signer: None,
145            msg_rx: None,
146            verifier: None,
147            pacemaker: None,
148            persistence: None,
149            evidence_store: None,
150            wal: None,
151        }
152    }
153
154    pub fn state(mut self, state: ConsensusState) -> Self {
155        self.state = Some(state);
156        self
157    }
158
159    pub fn store(mut self, store: SharedBlockStore) -> Self {
160        self.store = Some(store);
161        self
162    }
163
164    pub fn network(mut self, network: Box<dyn NetworkSink>) -> Self {
165        self.network = Some(network);
166        self
167    }
168
169    pub fn app(mut self, app: Box<dyn Application>) -> Self {
170        self.app = Some(app);
171        self
172    }
173
174    pub fn signer(mut self, signer: Box<dyn Signer>) -> Self {
175        self.signer = Some(signer);
176        self
177    }
178
179    pub fn messages(
180        mut self,
181        msg_rx: mpsc::Receiver<(Option<ValidatorId>, ConsensusMessage)>,
182    ) -> Self {
183        self.msg_rx = Some(msg_rx);
184        self
185    }
186
187    pub fn verifier(mut self, verifier: Box<dyn Verifier>) -> Self {
188        self.verifier = Some(verifier);
189        self
190    }
191
192    pub fn pacemaker(mut self, config: PacemakerConfig) -> Self {
193        self.pacemaker = Some(config);
194        self
195    }
196
197    pub fn persistence(mut self, persistence: Box<dyn StatePersistence>) -> Self {
198        self.persistence = Some(persistence);
199        self
200    }
201
202    pub fn evidence_store(mut self, store: Box<dyn EvidenceStore>) -> Self {
203        self.evidence_store = Some(store);
204        self
205    }
206
207    pub fn wal(mut self, wal: Box<dyn Wal>) -> Self {
208        self.wal = Some(wal);
209        self
210    }
211
212    pub fn build(self) -> ruc::Result<ConsensusEngine> {
213        let state = self.state.ok_or_else(|| ruc::eg!("state is required"))?;
214        let store = self.store.ok_or_else(|| ruc::eg!("store is required"))?;
215        let network = self
216            .network
217            .ok_or_else(|| ruc::eg!("network is required"))?;
218        let app = self.app.ok_or_else(|| ruc::eg!("app is required"))?;
219        let signer = self.signer.ok_or_else(|| ruc::eg!("signer is required"))?;
220        let msg_rx = self
221            .msg_rx
222            .ok_or_else(|| ruc::eg!("messages (msg_rx) is required"))?;
223        let verifier = self
224            .verifier
225            .ok_or_else(|| ruc::eg!("verifier is required"))?;
226
227        let config = EngineConfig {
228            verifier,
229            pacemaker: self.pacemaker,
230            persistence: self.persistence,
231            evidence_store: self.evidence_store,
232            wal: self.wal,
233        };
234
235        Ok(ConsensusEngine::new(
236            state, store, network, app, signer, msg_rx, config,
237        ))
238    }
239}
240
241impl Default for ConsensusEngineBuilder {
242    fn default() -> Self {
243        Self::new()
244    }
245}
246
247impl ConsensusEngine {
248    pub fn new(
249        state: ConsensusState,
250        store: SharedBlockStore,
251        network: Box<dyn NetworkSink>,
252        app: Box<dyn Application>,
253        signer: Box<dyn Signer>,
254        msg_rx: mpsc::Receiver<(Option<ValidatorId>, ConsensusMessage)>,
255        config: EngineConfig,
256    ) -> Self {
257        let pc = config.pacemaker.unwrap_or_default();
258        Self {
259            state,
260            store,
261            network,
262            app,
263            signer,
264            verifier: config.verifier,
265            vote_collector: VoteCollector::new(),
266            pacemaker: Pacemaker::with_config(pc.clone()),
267            pacemaker_config: pc,
268            msg_rx,
269            status_senders: HashSet::new(),
270            current_view_qc: None,
271            pending_epoch: None,
272            persistence: config.persistence,
273            evidence_store: config.evidence_store,
274            liveness_tracker: LivenessTracker::new(),
275            wal: config.wal,
276        }
277    }
278
279    /// Bootstrap and start the event loop.
280    /// If persisted state was restored (current_view > 1), skip genesis bootstrap.
281    pub async fn run(mut self) {
282        // Check application info against consensus state for divergence detection.
283        let app_info = self.app.info();
284        if app_info.last_block_height.as_u64() > 0
285            && app_info.last_block_height != self.state.last_committed_height
286        {
287            warn!(
288                app_height = app_info.last_block_height.as_u64(),
289                consensus_height = self.state.last_committed_height.as_u64(),
290                "application height differs from consensus state — possible state divergence"
291            );
292        }
293
294        if self.state.current_view.as_u64() <= 1 {
295            self.enter_genesis_view().await;
296        } else {
297            info!(
298                validator = %self.state.validator_id,
299                view = %self.state.current_view,
300                height = %self.state.last_committed_height,
301                "resuming from persisted state"
302            );
303            self.pacemaker.reset_timer();
304        }
305
306        loop {
307            let deadline = self.pacemaker.sleep_until_deadline();
308            tokio::pin!(deadline);
309
310            tokio::select! {
311                Some((sender, msg)) = self.msg_rx.recv() => {
312                    if let Err(e) = self.handle_message(sender, msg).await {
313                        warn!(validator = %self.state.validator_id, error = %e, "error handling message");
314                    }
315                }
316                _ = &mut deadline => {
317                    self.handle_timeout().await;
318                }
319            }
320        }
321    }
322
323    async fn enter_genesis_view(&mut self) {
324        // Create a synthetic genesis QC so the first leader can propose
325        let genesis_qc = QuorumCertificate {
326            block_hash: BlockHash::GENESIS,
327            view: ViewNumber::GENESIS,
328            aggregate_signature: AggregateSignature::new(
329                self.state.validator_set.validator_count(),
330            ),
331            epoch: self.state.current_epoch.number,
332        };
333        self.state.highest_qc = Some(genesis_qc);
334
335        let view = ViewNumber(1);
336        view_protocol::enter_view(
337            &mut self.state,
338            view,
339            ViewEntryTrigger::Genesis,
340            self.network.as_ref(),
341            self.signer.as_ref(),
342        );
343        self.pacemaker.reset_timer();
344
345        // If leader of genesis view, propose immediately
346        if self.state.is_leader() {
347            self.state.step = ViewStep::WaitingForStatus;
348            // In genesis, skip status wait — propose directly
349            self.try_propose().await;
350        }
351    }
352
353    fn try_propose(
354        &mut self,
355    ) -> std::pin::Pin<Box<dyn std::future::Future<Output = ()> + Send + '_>> {
356        Box::pin(async move {
357            // Collect pending evidence to embed in the block (C-3).
358            let pending_evidence = self
359                .evidence_store
360                .as_ref()
361                .map(|s| s.get_pending())
362                .unwrap_or_default();
363
364            // Propose is synchronous; acquire, run, and release the lock before any await.
365            let proposed_block = {
366                let mut store = self.store.write();
367                view_protocol::propose(
368                    &mut self.state,
369                    store.as_mut(),
370                    self.network.as_ref(),
371                    self.app.as_ref(),
372                    self.signer.as_ref(),
373                    pending_evidence,
374                )
375            }; // lock released here
376
377            match proposed_block {
378                Ok(block) => {
379                    // Leader votes for its own block
380                    self.leader_self_vote(block.hash).await;
381                }
382                Err(e) => {
383                    warn!(
384                        validator = %self.state.validator_id,
385                        error = %e,
386                        "failed to propose"
387                    );
388                }
389            }
390        })
391    }
392
393    async fn leader_self_vote(&mut self, block_hash: BlockHash) {
394        let vote_bytes = Vote::signing_bytes(
395            &self.state.chain_id_hash,
396            self.state.current_epoch.number,
397            self.state.current_view,
398            &block_hash,
399            VoteType::Vote,
400        );
401        let signature = self.signer.sign(&vote_bytes);
402        let vote = Vote {
403            block_hash,
404            view: self.state.current_view,
405            validator: self.state.validator_id,
406            signature,
407            vote_type: VoteType::Vote,
408            extension: None,
409        };
410        match self.vote_collector.add_vote(
411            &self.state.validator_set,
412            vote,
413            self.state.current_epoch.number,
414        ) {
415            Ok(result) => {
416                self.handle_equivocation(&result);
417                if let Some(qc) = result.qc {
418                    self.on_qc_formed(qc).await;
419                }
420            }
421            Err(e) => warn!(error = %e, "failed to add self vote"),
422        }
423    }
424}
425
426/// Verify the per-sender individual signature on a consensus message before relaying.
427///
428/// Only checks signatures that can be attributed to a single known validator using the
429/// provided key map.  Aggregate certificates (TimeoutCert, justify/prepare QCs) are
430/// intentionally not fully re-verified here — the receiving engine always does the full
431/// check.  Messages whose signing bytes depend on receiver state (StatusCert needs
432/// `current_view`) are also allowed through; the engine will reject them if invalid.
433///
434/// `ordered_validators` is the validator list in round-robin order (same order used by
435/// `ValidatorSet::leader_for_view`).  Pass an empty slice to skip the leader-for-view
436/// check (e.g., in tests where the set is not available).
437///
438/// Returns `false` when:
439/// - The claimed sender is not in `validator_keys` (unknown/non-validator peer), OR
440/// - The individual signature is cryptographically invalid, OR
441/// - For Prepare: the sender is not the leader for the certificate's view.
442pub fn verify_relay_sender(
443    sender: ValidatorId,
444    msg: &ConsensusMessage,
445    validator_keys: &HashMap<ValidatorId, hotmint_types::crypto::PublicKey>,
446    ordered_validators: &[ValidatorId],
447    chain_id_hash: &[u8; 32],
448    epoch: hotmint_types::epoch::EpochNumber,
449) -> bool {
450    use hotmint_crypto::Ed25519Verifier;
451    use hotmint_types::Verifier;
452    use hotmint_types::vote::Vote;
453    let verifier = Ed25519Verifier;
454    match msg {
455        ConsensusMessage::Propose {
456            block,
457            justify,
458            signature,
459            ..
460        } => {
461            let Some(pk) = validator_keys.get(&block.proposer) else {
462                return false;
463            };
464            let bytes =
465                crate::view_protocol::proposal_signing_bytes(chain_id_hash, epoch, block, justify);
466            Verifier::verify(&verifier, pk, &bytes, signature)
467        }
468        ConsensusMessage::VoteMsg(vote) | ConsensusMessage::Vote2Msg(vote) => {
469            let Some(pk) = validator_keys.get(&vote.validator) else {
470                return false;
471            };
472            let bytes = Vote::signing_bytes(
473                chain_id_hash,
474                epoch,
475                vote.view,
476                &vote.block_hash,
477                vote.vote_type,
478            );
479            Verifier::verify(&verifier, pk, &bytes, &vote.signature)
480        }
481        ConsensusMessage::Prepare {
482            certificate,
483            signature,
484        } => {
485            // Prepare is broadcast by the current leader. Verify that the relay
486            // sender is the leader for this view, then check the signature.
487            if !ordered_validators.is_empty() {
488                let n = ordered_validators.len();
489                let expected_leader = ordered_validators[certificate.view.as_u64() as usize % n];
490                if sender != expected_leader {
491                    return false;
492                }
493            }
494            let Some(pk) = validator_keys.get(&sender) else {
495                return false;
496            };
497            let bytes =
498                crate::view_protocol::prepare_signing_bytes(chain_id_hash, epoch, certificate);
499            Verifier::verify(&verifier, pk, &bytes, signature)
500        }
501        ConsensusMessage::Wish {
502            target_view,
503            validator,
504            highest_qc,
505            signature,
506        } => {
507            let Some(pk) = validator_keys.get(validator) else {
508                return false;
509            };
510            let bytes = crate::pacemaker::wish_signing_bytes(
511                chain_id_hash,
512                epoch,
513                *target_view,
514                highest_qc.as_ref(),
515            );
516            Verifier::verify(&verifier, pk, &bytes, signature)
517        }
518        ConsensusMessage::TimeoutCert(tc) => {
519            // Full relay verification: verify each signer's signature + quorum check.
520            let target_view = ViewNumber(tc.view.as_u64() + 1);
521            let n = ordered_validators.len();
522            if n == 0 || tc.aggregate_signature.signers.len() != n {
523                return false;
524            }
525            let mut sig_idx = 0usize;
526            let mut verified_count = 0usize;
527            for (i, &signed) in tc.aggregate_signature.signers.iter().enumerate() {
528                if !signed {
529                    continue;
530                }
531                if i >= n {
532                    return false;
533                }
534                let vid = ordered_validators[i];
535                let Some(pk) = validator_keys.get(&vid) else {
536                    return false;
537                };
538                let hqc = tc.highest_qcs.get(i).and_then(|h| h.as_ref());
539                let bytes =
540                    crate::pacemaker::wish_signing_bytes(chain_id_hash, epoch, target_view, hqc);
541                if sig_idx >= tc.aggregate_signature.signatures.len() {
542                    return false;
543                }
544                if !Verifier::verify(
545                    &verifier,
546                    pk,
547                    &bytes,
548                    &tc.aggregate_signature.signatures[sig_idx],
549                ) {
550                    return false;
551                }
552                sig_idx += 1;
553                verified_count += 1;
554            }
555            if sig_idx != tc.aggregate_signature.signatures.len() {
556                return false;
557            }
558            // Quorum check: require > 2/3 of validators (by count, not power —
559            // full power-based check is done in engine::verify_message).
560            verified_count * 3 > n * 2
561        }
562        ConsensusMessage::StatusCert {
563            validator,
564            signature,
565            locked_qc,
566            ..
567        } => {
568            // StatusCert signing bytes require current_view which we don't have
569            // in the relay context. Verify the sender is a known validator and
570            // the signature is over *some* plausible view (the TC view is not
571            // available here). The engine does full verification with correct view.
572            // At minimum, reject unknown validators.
573            let Some(pk) = validator_keys.get(validator) else {
574                return false;
575            };
576            // We cannot construct exact signing bytes without knowing current_view,
577            // so we accept from known validators only. The engine's verify_message
578            // will do full cryptographic verification.
579            let _ = (pk, signature, locked_qc);
580            true
581        }
582        ConsensusMessage::Evidence(_) => {
583            // Evidence gossip: accept from any known validator.
584            // The engine verifies the proof internally.
585            validator_keys.contains_key(&sender)
586        }
587    }
588}
589
590impl ConsensusEngine {
591    /// Epoch numbers to try when verifying signatures. During epoch transitions,
592    /// some nodes may still be in the previous epoch. We try the current epoch
593    /// first, then fall back to epoch - 1 to tolerate the transition window.
594    fn verification_epochs(&self) -> [EpochNumber; 2] {
595        let cur = self.state.current_epoch.number;
596        let prev = if cur.as_u64() > 0 {
597            EpochNumber(cur.as_u64() - 1)
598        } else {
599            cur
600        };
601        [cur, prev]
602    }
603
604    /// Verify the cryptographic signature on an inbound consensus message.
605    /// Returns false (and logs a warning) if verification fails.
606    /// Messages from past views are skipped (they'll be dropped by handle_message anyway).
607    ///
608    /// Crypto-heavy paths (aggregate signature verification) are run via
609    /// `tokio::task::block_in_place` so the async event loop remains responsive
610    /// while Ed25519 batch verification runs on the current OS thread.
611    fn verify_message(&self, msg: &ConsensusMessage) -> bool {
612        // Skip verification for non-Propose past-view messages — these may have
613        // been signed by a previous epoch's validator set. They'll be dropped by
614        // view checks. Propose messages are always verified because they may still
615        // be stored (for chain continuity in fast-forward).
616        let msg_view = match msg {
617            ConsensusMessage::Propose { .. } => None, // always verify proposals
618            ConsensusMessage::VoteMsg(v) | ConsensusMessage::Vote2Msg(v) => Some(v.view),
619            ConsensusMessage::Prepare { certificate, .. } => Some(certificate.view),
620            ConsensusMessage::Wish { target_view, .. } => Some(*target_view),
621            ConsensusMessage::TimeoutCert(tc) => Some(ViewNumber(tc.view.as_u64() + 1)),
622            ConsensusMessage::StatusCert { .. } => None,
623            ConsensusMessage::Evidence(_) => None, // always accept evidence
624        };
625        if let Some(v) = msg_view
626            && v < self.state.current_view
627        {
628            return true; // will be dropped by handler
629        }
630
631        let vs = &self.state.validator_set;
632        match msg {
633            ConsensusMessage::Propose {
634                block,
635                justify,
636                signature,
637                ..
638            } => {
639                let proposer = vs.get(block.proposer);
640                let Some(vi) = proposer else {
641                    warn!(proposer = %block.proposer, "propose from unknown validator");
642                    return false;
643                };
644                let mut proposal_ok = false;
645                for epoch in self.verification_epochs() {
646                    let bytes = view_protocol::proposal_signing_bytes(
647                        &self.state.chain_id_hash,
648                        epoch,
649                        block,
650                        justify,
651                    );
652                    if self.verifier.verify(&vi.public_key, &bytes, signature) {
653                        proposal_ok = true;
654                        break;
655                    }
656                }
657                if !proposal_ok {
658                    warn!(proposer = %block.proposer, "invalid proposal signature");
659                    return false;
660                }
661                // Verify justify QC aggregate signature (skip genesis QC which has no signers)
662                if justify.aggregate_signature.count() > 0 {
663                    let qc_bytes = Vote::signing_bytes(
664                        &self.state.chain_id_hash,
665                        justify.epoch,
666                        justify.view,
667                        &justify.block_hash,
668                        VoteType::Vote,
669                    );
670                    if !self
671                        .verifier
672                        .verify_aggregate(vs, &qc_bytes, &justify.aggregate_signature)
673                    {
674                        warn!(proposer = %block.proposer, "invalid justify QC aggregate signature");
675                        return false;
676                    }
677                    if justify.epoch == self.state.current_epoch.number
678                        && !hotmint_crypto::has_quorum(vs, &justify.aggregate_signature)
679                    {
680                        warn!(proposer = %block.proposer, "justify QC below quorum threshold");
681                        return false;
682                    }
683                }
684                true
685            }
686            ConsensusMessage::VoteMsg(vote) | ConsensusMessage::Vote2Msg(vote) => {
687                let Some(vi) = vs.get(vote.validator) else {
688                    warn!(validator = %vote.validator, "vote from unknown validator");
689                    return false;
690                };
691                let mut ok = false;
692                for epoch in self.verification_epochs() {
693                    let bytes = Vote::signing_bytes(
694                        &self.state.chain_id_hash,
695                        epoch,
696                        vote.view,
697                        &vote.block_hash,
698                        vote.vote_type,
699                    );
700                    if self
701                        .verifier
702                        .verify(&vi.public_key, &bytes, &vote.signature)
703                    {
704                        ok = true;
705                        break;
706                    }
707                }
708                if !ok {
709                    warn!(validator = %vote.validator, "invalid vote signature");
710                    return false;
711                }
712                true
713            }
714            ConsensusMessage::Prepare {
715                certificate,
716                signature,
717            } => {
718                // Verify the leader's signature on the prepare message
719                let Some(leader) = vs.leader_for_view(certificate.view) else {
720                    return false;
721                };
722                let mut prepare_ok = false;
723                for epoch in self.verification_epochs() {
724                    let bytes = view_protocol::prepare_signing_bytes(
725                        &self.state.chain_id_hash,
726                        epoch,
727                        certificate,
728                    );
729                    if self.verifier.verify(&leader.public_key, &bytes, signature) {
730                        prepare_ok = true;
731                        break;
732                    }
733                }
734                if !prepare_ok {
735                    warn!(view = %certificate.view, "invalid prepare signature");
736                    return false;
737                }
738                // Also verify the QC's aggregate signature and quorum
739                let qc_bytes = Vote::signing_bytes(
740                    &self.state.chain_id_hash,
741                    certificate.epoch,
742                    certificate.view,
743                    &certificate.block_hash,
744                    VoteType::Vote,
745                );
746                if !self
747                    .verifier
748                    .verify_aggregate(vs, &qc_bytes, &certificate.aggregate_signature)
749                {
750                    warn!(view = %certificate.view, "invalid QC aggregate signature");
751                    return false;
752                }
753                if certificate.epoch == self.state.current_epoch.number
754                    && !hotmint_crypto::has_quorum(vs, &certificate.aggregate_signature)
755                {
756                    warn!(view = %certificate.view, "Prepare QC below quorum threshold");
757                    return false;
758                }
759                true
760            }
761            ConsensusMessage::Wish {
762                target_view,
763                validator,
764                highest_qc,
765                signature,
766            } => {
767                let Some(vi) = vs.get(*validator) else {
768                    warn!(validator = %validator, "wish from unknown validator");
769                    return false;
770                };
771                // Signing bytes bind both target_view and highest_qc to prevent replay.
772                let mut wish_ok = false;
773                for epoch in self.verification_epochs() {
774                    let bytes = crate::pacemaker::wish_signing_bytes(
775                        &self.state.chain_id_hash,
776                        epoch,
777                        *target_view,
778                        highest_qc.as_ref(),
779                    );
780                    if self.verifier.verify(&vi.public_key, &bytes, signature) {
781                        wish_ok = true;
782                        break;
783                    }
784                }
785                if !wish_ok {
786                    warn!(validator = %validator, "invalid wish signature");
787                    return false;
788                }
789                true
790            }
791            ConsensusMessage::TimeoutCert(tc) => {
792                // The TC's aggregate signature is a collection of individual Ed25519 signatures,
793                // each signed over wish_signing_bytes(target_view, signer_highest_qc).
794                // Because each validator may have a different highest_qc, we verify per-signer
795                // using tc.highest_qcs[i] (indexed by validator slot).
796                // This also enforces quorum: we sum voting power of verified signers.
797                let target_view = ViewNumber(tc.view.as_u64() + 1);
798                let n = vs.validator_count();
799                if tc.aggregate_signature.signers.len() != n {
800                    warn!(view = %tc.view, "TC signers bitfield length mismatch");
801                    return false;
802                }
803                let mut sig_idx = 0usize;
804                let mut power = 0u64;
805                for (i, &signed) in tc.aggregate_signature.signers.iter().enumerate() {
806                    if !signed {
807                        continue;
808                    }
809                    let Some(vi) = vs.validators().get(i) else {
810                        warn!(view = %tc.view, validator_idx = i, "TC signer index out of validator set");
811                        return false;
812                    };
813                    let hqc = tc.highest_qcs.get(i).and_then(|h| h.as_ref());
814                    if sig_idx >= tc.aggregate_signature.signatures.len() {
815                        warn!(view = %tc.view, "TC aggregate_signature has fewer sigs than signers");
816                        return false;
817                    }
818                    let mut tc_sig_ok = false;
819                    for epoch in self.verification_epochs() {
820                        let bytes = crate::pacemaker::wish_signing_bytes(
821                            &self.state.chain_id_hash,
822                            epoch,
823                            target_view,
824                            hqc,
825                        );
826                        if self.verifier.verify(
827                            &vi.public_key,
828                            &bytes,
829                            &tc.aggregate_signature.signatures[sig_idx],
830                        ) {
831                            tc_sig_ok = true;
832                            break;
833                        }
834                    }
835                    if !tc_sig_ok {
836                        warn!(view = %tc.view, validator = %vi.id, "TC signer signature invalid");
837                        return false;
838                    }
839                    power += vs.power_of(vi.id);
840                    sig_idx += 1;
841                }
842                if sig_idx != tc.aggregate_signature.signatures.len() {
843                    warn!(view = %tc.view, "TC has extra signatures beyond bitfield");
844                    return false;
845                }
846                if power < vs.quorum_threshold() {
847                    warn!(view = %tc.view, power, threshold = vs.quorum_threshold(), "TC insufficient quorum");
848                    return false;
849                }
850                true
851            }
852            ConsensusMessage::StatusCert {
853                locked_qc,
854                validator,
855                signature,
856            } => {
857                let Some(vi) = vs.get(*validator) else {
858                    warn!(validator = %validator, "status from unknown validator");
859                    return false;
860                };
861                let mut status_ok = false;
862                for epoch in self.verification_epochs() {
863                    let bytes = view_protocol::status_signing_bytes(
864                        &self.state.chain_id_hash,
865                        epoch,
866                        self.state.current_view,
867                        locked_qc,
868                    );
869                    if self.verifier.verify(&vi.public_key, &bytes, signature) {
870                        status_ok = true;
871                        break;
872                    }
873                }
874                if !status_ok {
875                    warn!(validator = %validator, "invalid status signature");
876                    return false;
877                }
878                true
879            }
880            ConsensusMessage::Evidence(_) => {
881                // Evidence gossip does not carry an outer signature;
882                // the proof itself contains the conflicting vote signatures
883                // which are verified by the application layer.
884                true
885            }
886        }
887    }
888
889    async fn handle_message(
890        &mut self,
891        _sender: Option<ValidatorId>,
892        msg: ConsensusMessage,
893    ) -> Result<()> {
894        // Run signature verification in a blocking context so that the tokio
895        // event loop is not stalled by CPU-intensive Ed25519 batch operations.
896        // block_in_place yields the current thread to the scheduler while the
897        // blocking work runs, keeping timers and I/O tasks responsive.
898        let verified = tokio::task::block_in_place(|| self.verify_message(&msg));
899        if !verified {
900            return Ok(());
901        }
902
903        match msg {
904            ConsensusMessage::Propose {
905                block,
906                justify,
907                double_cert,
908                signature: _,
909            } => {
910                let block = *block;
911                let justify = *justify;
912                let double_cert = double_cert.map(|dc| *dc);
913
914                // If proposal is from a future view, advance to it first
915                if block.view > self.state.current_view {
916                    if let Some(ref dc) = double_cert {
917                        if !tokio::task::block_in_place(|| self.validate_double_cert(dc)) {
918                            return Ok(());
919                        }
920
921                        // Fast-forward via double cert
922                        self.apply_commit(dc, "fast-forward").await;
923                        self.state.highest_double_cert = Some(dc.clone());
924                        self.advance_view_to(block.view, ViewEntryTrigger::DoubleCert(dc.clone()))
925                            .await;
926                    } else {
927                        return Ok(());
928                    }
929                } else if block.view < self.state.current_view {
930                    // Still store blocks from past views if we haven't committed
931                    // that height yet. This handles the case where fast-forward
932                    // advanced our view but we missed storing the block from the
933                    // earlier proposal. Without this, chain commits that walk
934                    // the parent chain would fail with "block not found".
935                    if block.height > self.state.last_committed_height {
936                        // Verify block hash before storing past-view blocks
937                        let expected = hotmint_crypto::compute_block_hash(&block);
938                        if block.hash == expected {
939                            let mut store = self.store.write();
940                            store.put_block(block);
941                        }
942                    }
943                    return Ok(());
944                }
945
946                let mut store = self.store.write();
947
948                // R-25: verify any DoubleCert in the same-view proposal path.
949                // The future-view path already calls validate_double_cert; the same-view path
950                // passes the DC straight to on_proposal → try_commit without verification.
951                // A Byzantine leader could inject a forged DC to trigger incorrect commits.
952                if let Some(ref dc) = double_cert
953                    && !tokio::task::block_in_place(|| self.validate_double_cert(dc))
954                {
955                    return Ok(());
956                }
957
958                // R-28: persist justify QC as commit evidence for the block it certifies.
959                // When blocks are committed via the 2-chain rule (possibly multiple blocks at
960                // once), the innermost block gets its own commit QC, but ancestor blocks only
961                // get the chain-rule commit and have no stored QC.  Storing the justify QC here
962                // ensures that sync responders can later serve those ancestor blocks with proof.
963                if justify.aggregate_signature.count() > 0
964                    && let Some(justified_block) = store.get_block(&justify.block_hash)
965                    && store.get_commit_qc(justified_block.height).is_none()
966                {
967                    store.put_commit_qc(justified_block.height, justify.clone());
968                }
969
970                // WAL: log commit intent before fast-forward commit in on_proposal.
971                if let Some(ref dc) = double_cert
972                    && let Some(ref mut wal) = self.wal
973                    && let Some(target_block) = store.get_block(&dc.inner_qc.block_hash)
974                    && let Err(e) = wal.log_commit_intent(target_block.height)
975                {
976                    warn!(error = %e, "WAL: failed to log commit intent for fast-forward");
977                }
978
979                let proposal_result = view_protocol::on_proposal(
980                    &mut self.state,
981                    view_protocol::ProposalData {
982                        block,
983                        justify,
984                        double_cert,
985                    },
986                    store.as_mut(),
987                    self.network.as_ref(),
988                    self.app.as_ref(),
989                    self.signer.as_ref(),
990                )
991                .c(d!())?;
992                drop(store);
993
994                // Process fast-forward commit result (WAL, tx indexing,
995                // evidence marking, liveness tracking, persist_state).
996                if let Some(result) = proposal_result.commit_result {
997                    self.process_commit_result(&result);
998                }
999                if let Some(epoch) = proposal_result.pending_epoch {
1000                    self.pending_epoch = Some(epoch);
1001                }
1002            }
1003
1004            ConsensusMessage::VoteMsg(vote) => {
1005                if vote.view != self.state.current_view {
1006                    return Ok(());
1007                }
1008                if !self.state.is_leader() {
1009                    return Ok(());
1010                }
1011                if vote.vote_type != VoteType::Vote {
1012                    return Ok(());
1013                }
1014
1015                let result = self
1016                    .vote_collector
1017                    .add_vote(
1018                        &self.state.validator_set,
1019                        vote,
1020                        self.state.current_epoch.number,
1021                    )
1022                    .c(d!())?;
1023                self.handle_equivocation(&result);
1024                if let Some(qc) = result.qc {
1025                    self.on_qc_formed(qc).await;
1026                }
1027            }
1028
1029            ConsensusMessage::Prepare {
1030                certificate,
1031                signature: _,
1032            } => {
1033                if certificate.view < self.state.current_view {
1034                    return Ok(());
1035                }
1036                if certificate.view == self.state.current_view {
1037                    // Validate the Prepare's block app_hash if we have the block in
1038                    // store. Prevents locking onto a block whose app_hash diverges from
1039                    // our local state. When the block is absent (node caught up via TC),
1040                    // we defer to the QC's 2f+1 signatures for safety.
1041                    let store = self.store.read();
1042                    let block_opt = store.get_block(&certificate.block_hash);
1043                    if self.app.tracks_app_hash()
1044                        && let Some(ref block) = block_opt
1045                        && block.app_hash != self.state.last_app_hash
1046                    {
1047                        warn!(
1048                            block_app_hash = %block.app_hash,
1049                            local_app_hash = %self.state.last_app_hash,
1050                            "prepare block app_hash mismatch, ignoring"
1051                        );
1052                        return Ok(());
1053                    }
1054
1055                    // Generate vote extension for Vote2 (ABCI++ Vote Extensions).
1056                    // Only if we have the block available and have voting power.
1057                    let vote_extension = block_opt.and_then(|block| {
1058                        let ctx = BlockContext {
1059                            height: block.height,
1060                            view: self.state.current_view,
1061                            proposer: block.proposer,
1062                            epoch: self.state.current_epoch.number,
1063                            epoch_start_view: self.state.current_epoch.start_view,
1064                            validator_set: &self.state.validator_set,
1065                            vote_extensions: vec![],
1066                        };
1067                        self.app.extend_vote(&block, &ctx)
1068                    });
1069                    drop(store);
1070
1071                    view_protocol::on_prepare(
1072                        &mut self.state,
1073                        certificate,
1074                        self.network.as_ref(),
1075                        self.signer.as_ref(),
1076                        vote_extension,
1077                    );
1078                }
1079            }
1080
1081            ConsensusMessage::Vote2Msg(vote) => {
1082                if vote.view != self.state.current_view {
1083                    return Ok(());
1084                }
1085                if vote.vote_type != VoteType::Vote2 {
1086                    return Ok(());
1087                }
1088
1089                // Verify vote extension (ABCI++ Vote Extensions) if present.
1090                if let Some(ref ext) = vote.extension
1091                    && !self
1092                        .app
1093                        .verify_vote_extension(ext, &vote.block_hash, vote.validator)
1094                {
1095                    warn!(
1096                        validator = %vote.validator,
1097                        view = %vote.view,
1098                        "rejecting vote2: invalid vote extension"
1099                    );
1100                    return Ok(());
1101                }
1102
1103                let result = self
1104                    .vote_collector
1105                    .add_vote(
1106                        &self.state.validator_set,
1107                        vote,
1108                        self.state.current_epoch.number,
1109                    )
1110                    .c(d!())?;
1111                self.handle_equivocation(&result);
1112                if let Some(outer_qc) = result.qc {
1113                    self.on_double_cert_formed(outer_qc, result.extensions)
1114                        .await;
1115                }
1116            }
1117
1118            ConsensusMessage::Wish {
1119                target_view,
1120                validator,
1121                highest_qc,
1122                signature,
1123            } => {
1124                // Validate carried highest_qc (C4 mitigation).
1125                // Both signature authenticity and 2f+1 quorum weight must pass.
1126                if let Some(ref qc) = highest_qc
1127                    && qc.aggregate_signature.count() > 0
1128                {
1129                    let qc_bytes = Vote::signing_bytes(
1130                        &self.state.chain_id_hash,
1131                        qc.epoch,
1132                        qc.view,
1133                        &qc.block_hash,
1134                        VoteType::Vote,
1135                    );
1136                    if !tokio::task::block_in_place(|| {
1137                        self.verifier.verify_aggregate(
1138                            &self.state.validator_set,
1139                            &qc_bytes,
1140                            &qc.aggregate_signature,
1141                        )
1142                    }) {
1143                        warn!(validator = %validator, "wish carries invalid highest_qc signature");
1144                        return Ok(());
1145                    }
1146                    // Only enforce quorum against the current validator set if the
1147                    // QC was formed in the current epoch. A QC from a previous epoch
1148                    // may not meet the new set's quorum threshold (e.g., after a
1149                    // validator power change), but its signatures were already verified
1150                    // above, so it remains a valid proof of finality in its own epoch.
1151                    if qc.epoch == self.state.current_epoch.number
1152                        && !hotmint_crypto::has_quorum(
1153                            &self.state.validator_set,
1154                            &qc.aggregate_signature,
1155                        )
1156                    {
1157                        warn!(validator = %validator, "wish carries highest_qc without quorum");
1158                        return Ok(());
1159                    }
1160                }
1161
1162                if let Some(tc) = self.pacemaker.add_wish(
1163                    &self.state.validator_set,
1164                    target_view,
1165                    validator,
1166                    highest_qc,
1167                    signature,
1168                ) {
1169                    info!(
1170                        validator = %self.state.validator_id,
1171                        view = %tc.view,
1172                        "TC formed, advancing view"
1173                    );
1174                    self.network
1175                        .broadcast(ConsensusMessage::TimeoutCert(tc.clone()));
1176                    self.advance_view(ViewEntryTrigger::TimeoutCert(tc)).await;
1177                }
1178            }
1179
1180            ConsensusMessage::TimeoutCert(tc) => {
1181                if self.pacemaker.should_relay_tc(&tc) {
1182                    self.network
1183                        .broadcast(ConsensusMessage::TimeoutCert(tc.clone()));
1184                }
1185                let new_view = ViewNumber(tc.view.as_u64() + 1);
1186                if new_view > self.state.current_view {
1187                    self.advance_view(ViewEntryTrigger::TimeoutCert(tc)).await;
1188                }
1189            }
1190
1191            ConsensusMessage::StatusCert {
1192                locked_qc,
1193                validator,
1194                signature: _,
1195            } => {
1196                if self.state.is_leader() && self.state.step == ViewStep::WaitingForStatus {
1197                    if let Some(ref qc) = locked_qc {
1198                        self.state.update_highest_qc(qc);
1199                    }
1200                    self.status_senders.insert(validator);
1201                    let status_power: u64 = self
1202                        .status_senders
1203                        .iter()
1204                        .map(|v| self.state.validator_set.power_of(*v))
1205                        .sum();
1206                    // Leader's own power counts toward quorum
1207                    let total_power =
1208                        status_power + self.state.validator_set.power_of(self.state.validator_id);
1209                    if total_power >= self.state.validator_set.quorum_threshold() {
1210                        self.try_propose().await;
1211                    }
1212                }
1213            }
1214
1215            ConsensusMessage::Evidence(proof) => {
1216                // C-6: Cryptographically verify evidence before accepting.
1217                // Both signatures must be valid from the alleged validator for
1218                // different block hashes at the same (view, vote_type).
1219                let vs = &self.state.validator_set;
1220                let vi = match vs.get(proof.validator) {
1221                    Some(vi) => vi,
1222                    None => {
1223                        warn!(validator = %proof.validator, "evidence for unknown validator");
1224                        return Ok(());
1225                    }
1226                };
1227                if proof.block_hash_a == proof.block_hash_b {
1228                    warn!(validator = %proof.validator, "evidence has identical block hashes");
1229                    return Ok(());
1230                }
1231                // Use the epoch from the proof itself (not local epoch) so
1232                // cross-epoch evidence can be verified correctly.
1233                let bytes_a = Vote::signing_bytes(
1234                    &self.state.chain_id_hash,
1235                    proof.epoch,
1236                    proof.view,
1237                    &proof.block_hash_a,
1238                    proof.vote_type,
1239                );
1240                let bytes_b = Vote::signing_bytes(
1241                    &self.state.chain_id_hash,
1242                    proof.epoch,
1243                    proof.view,
1244                    &proof.block_hash_b,
1245                    proof.vote_type,
1246                );
1247                if !self
1248                    .verifier
1249                    .verify(&vi.public_key, &bytes_a, &proof.signature_a)
1250                    || !self
1251                        .verifier
1252                        .verify(&vi.public_key, &bytes_b, &proof.signature_b)
1253                {
1254                    warn!(validator = %proof.validator, "evidence has invalid signatures");
1255                    return Ok(());
1256                }
1257
1258                info!(
1259                    validator = %proof.validator,
1260                    view = %proof.view,
1261                    "received valid evidence gossip"
1262                );
1263                if let Err(e) = self.app.on_evidence(&proof) {
1264                    warn!(error = %e, "on_evidence callback failed for gossiped proof");
1265                }
1266                if let Some(ref mut store) = self.evidence_store {
1267                    store.put_evidence(proof);
1268                }
1269            }
1270        }
1271        Ok(())
1272    }
1273
1274    fn handle_equivocation(&mut self, result: &crate::vote_collector::VoteResult) {
1275        if let Some(ref proof) = result.equivocation {
1276            warn!(
1277                validator = %proof.validator,
1278                view = %proof.view,
1279                "equivocation detected!"
1280            );
1281            if let Err(e) = self.app.on_evidence(proof) {
1282                warn!(error = %e, "on_evidence callback failed");
1283            }
1284            self.network.broadcast_evidence(proof);
1285            if let Some(ref mut store) = self.evidence_store {
1286                store.put_evidence(proof.clone());
1287            }
1288        }
1289    }
1290
1291    async fn on_qc_formed(&mut self, qc: QuorumCertificate) {
1292        // Save the QC so we can reliably pair it when forming a DoubleCert
1293        self.current_view_qc = Some(qc.clone());
1294
1295        view_protocol::on_votes_collected(
1296            &mut self.state,
1297            qc.clone(),
1298            self.network.as_ref(),
1299            self.signer.as_ref(),
1300        );
1301
1302        // Leader also does vote2 for its own prepare (self-vote for step 5)
1303        // Generate vote extension (ABCI++ Vote Extensions) if the block is available.
1304        let vote_extension = {
1305            let store = self.store.read();
1306            store.get_block(&qc.block_hash).and_then(|block| {
1307                let ctx = BlockContext {
1308                    height: block.height,
1309                    view: self.state.current_view,
1310                    proposer: block.proposer,
1311                    epoch: self.state.current_epoch.number,
1312                    epoch_start_view: self.state.current_epoch.start_view,
1313                    validator_set: &self.state.validator_set,
1314                    vote_extensions: vec![],
1315                };
1316                self.app.extend_vote(&block, &ctx)
1317            })
1318        };
1319        let vote_bytes = Vote::signing_bytes(
1320            &self.state.chain_id_hash,
1321            self.state.current_epoch.number,
1322            self.state.current_view,
1323            &qc.block_hash,
1324            VoteType::Vote2,
1325        );
1326        let signature = self.signer.sign(&vote_bytes);
1327        let vote = Vote {
1328            block_hash: qc.block_hash,
1329            view: self.state.current_view,
1330            validator: self.state.validator_id,
1331            signature,
1332            vote_type: VoteType::Vote2,
1333            extension: vote_extension,
1334        };
1335
1336        // Lock on this QC
1337        self.state.update_locked_qc(&qc);
1338
1339        let next_leader_id =
1340            leader::next_leader(&self.state.validator_set, self.state.current_view);
1341        if next_leader_id == self.state.validator_id {
1342            // We are the next leader, collect vote2 locally
1343            match self.vote_collector.add_vote(
1344                &self.state.validator_set,
1345                vote,
1346                self.state.current_epoch.number,
1347            ) {
1348                Ok(result) => {
1349                    self.handle_equivocation(&result);
1350                    if let Some(outer_qc) = result.qc {
1351                        self.on_double_cert_formed(outer_qc, result.extensions)
1352                            .await;
1353                    }
1354                }
1355                Err(e) => warn!(error = %e, "failed to add self vote2"),
1356            }
1357        } else {
1358            self.network
1359                .send_to(next_leader_id, ConsensusMessage::Vote2Msg(vote));
1360        }
1361    }
1362
1363    async fn on_double_cert_formed(
1364        &mut self,
1365        outer_qc: QuorumCertificate,
1366        extensions: Vec<(ValidatorId, Vec<u8>)>,
1367    ) {
1368        // Use the QC we explicitly saved from this view's first voting round
1369        let inner_qc = match self.current_view_qc.take() {
1370            Some(qc) if qc.block_hash == outer_qc.block_hash => qc,
1371            _ => {
1372                // Fallback to locked_qc or highest_qc
1373                match &self.state.locked_qc {
1374                    Some(qc) if qc.block_hash == outer_qc.block_hash => qc.clone(),
1375                    _ => match &self.state.highest_qc {
1376                        Some(qc) if qc.block_hash == outer_qc.block_hash => qc.clone(),
1377                        _ => {
1378                            warn!(
1379                                validator = %self.state.validator_id,
1380                                "double cert formed but can't find matching inner QC"
1381                            );
1382                            return;
1383                        }
1384                    },
1385                }
1386            }
1387        };
1388
1389        let dc = DoubleCertificate {
1390            inner_qc,
1391            outer_qc,
1392            vote_extensions: extensions,
1393        };
1394
1395        info!(
1396            validator = %self.state.validator_id,
1397            view = %self.state.current_view,
1398            hash = %dc.inner_qc.block_hash,
1399            "double certificate formed, committing"
1400        );
1401
1402        // Commit
1403        self.apply_commit(&dc, "double-cert").await;
1404
1405        self.state.highest_double_cert = Some(dc.clone());
1406
1407        // Advance to next view — as new leader, include DC in proposal
1408        self.advance_view(ViewEntryTrigger::DoubleCert(dc)).await;
1409    }
1410
1411    async fn handle_timeout(&mut self) {
1412        // Skip wish building/signing entirely when we have no voting power (fullnodes).
1413        // build_wish involves a cryptographic signing operation that serves no purpose
1414        // when the wish will never be broadcast or counted toward a TC.
1415        let has_power = self.state.validator_set.power_of(self.state.validator_id) > 0;
1416        if !has_power {
1417            self.pacemaker.on_timeout();
1418            return;
1419        }
1420
1421        info!(
1422            validator = %self.state.validator_id,
1423            view = %self.state.current_view,
1424            "view timeout, sending wish"
1425        );
1426
1427        let wish = self.pacemaker.build_wish(
1428            &self.state.chain_id_hash,
1429            self.state.current_epoch.number,
1430            self.state.current_view,
1431            self.state.validator_id,
1432            self.state.highest_qc.clone(),
1433            self.signer.as_ref(),
1434        );
1435
1436        self.network.broadcast(wish.clone());
1437
1438        // Also process our own wish
1439        if let ConsensusMessage::Wish {
1440            target_view,
1441            validator,
1442            highest_qc,
1443            signature,
1444        } = wish
1445            && let Some(tc) = self.pacemaker.add_wish(
1446                &self.state.validator_set,
1447                target_view,
1448                validator,
1449                highest_qc,
1450                signature,
1451            )
1452        {
1453            self.network
1454                .broadcast(ConsensusMessage::TimeoutCert(tc.clone()));
1455            self.advance_view(ViewEntryTrigger::TimeoutCert(tc)).await;
1456            return;
1457        }
1458
1459        // Exponential backoff on repeated timeouts
1460        self.pacemaker.on_timeout();
1461    }
1462
1463    /// Post-commit processing: store commit QCs, index txs, mark evidence,
1464    /// track liveness, persist state, and log WAL done.
1465    /// Shared by both `apply_commit` (normal DC path) and the on_proposal
1466    /// fast-forward path to ensure all commit side-effects happen consistently.
1467    fn process_commit_result(&mut self, result: &CommitResult) {
1468        if result.committed_blocks.is_empty() {
1469            return;
1470        }
1471        {
1472            let mut s = self.store.write();
1473            for (i, block) in result.committed_blocks.iter().enumerate() {
1474                if result.commit_qc.block_hash == block.hash {
1475                    s.put_commit_qc(block.height, result.commit_qc.clone());
1476                }
1477                let txs = crate::commit::decode_payload(&block.payload);
1478                for (tx_idx, tx) in txs.iter().enumerate() {
1479                    let tx_hash = *blake3::hash(tx).as_bytes();
1480                    s.put_tx_index(tx_hash, block.height, tx_idx as u32);
1481                }
1482                if let Some(resp) = result.block_responses.get(i) {
1483                    s.put_block_results(block.height, resp.clone());
1484                }
1485            }
1486            s.flush();
1487        }
1488        if let Some(ref mut ev_store) = self.evidence_store {
1489            for block in &result.committed_blocks {
1490                for proof in &block.evidence {
1491                    ev_store.mark_committed(proof.view, proof.validator);
1492                }
1493                for proof in ev_store.get_pending() {
1494                    if proof.view <= block.view {
1495                        ev_store.mark_committed(proof.view, proof.validator);
1496                    }
1497                }
1498            }
1499        }
1500        self.liveness_tracker.record_commit(
1501            &self.state.validator_set,
1502            &result.commit_qc.aggregate_signature.signers,
1503        );
1504        self.persist_state();
1505        if let Some(ref mut wal) = self.wal
1506            && let Err(e) = wal.log_commit_done(self.state.last_committed_height)
1507        {
1508            warn!(error = %e, "WAL: failed to log commit done");
1509        }
1510    }
1511
1512    /// Apply the result of a successful try_commit: update app_hash, pending epoch,
1513    /// store commit QCs, and flush. Called from both normal and fast-forward commit paths.
1514    async fn apply_commit(&mut self, dc: &DoubleCertificate, context: &str) {
1515        // WAL: log commit intent before executing blocks.
1516        if let Some(ref mut wal) = self.wal {
1517            let target_height = {
1518                let store = self.store.read();
1519                store.get_block(&dc.inner_qc.block_hash).map(|b| b.height)
1520            };
1521            if let Some(h) = target_height
1522                && let Err(e) = wal.log_commit_intent(h)
1523            {
1524                warn!(error = %e, "WAL: failed to log commit intent");
1525            }
1526        }
1527
1528        let store = self.store.read();
1529        match try_commit(
1530            dc,
1531            store.as_ref(),
1532            self.app.as_ref(),
1533            &mut self.state.last_committed_height,
1534            &self.state.current_epoch,
1535        ) {
1536            Ok(result) => {
1537                if !result.committed_blocks.is_empty() {
1538                    self.state.last_app_hash = result.last_app_hash;
1539                }
1540                if result.pending_epoch.is_some() {
1541                    self.pending_epoch = result.pending_epoch.clone();
1542                }
1543                drop(store);
1544                self.process_commit_result(&result);
1545            }
1546            Err(e) => {
1547                warn!(error = %e, "try_commit failed during {context}");
1548                drop(store);
1549            }
1550        }
1551    }
1552
1553    /// Cryptographically validate a DoubleCertificate:
1554    /// 1. inner and outer QC must reference the same block hash
1555    /// 2. inner QC aggregate signature (Vote1) must be valid and reach quorum
1556    /// 3. outer QC aggregate signature (Vote2) must be valid and reach quorum
1557    ///
1558    /// Note on quorum and epoch transitions: DCs are always formed in the same epoch as
1559    /// the block they commit (vote_collector enforces quorum at formation time).  When
1560    /// a DC is received by a node that has already transitioned to a new epoch, the
1561    /// validator set may differ.  We enforce quorum against the current validator set
1562    /// as the best available reference; a legitimate DC from a prior epoch should still
1563    /// satisfy quorum against the new set unless the set shrank significantly.
1564    fn validate_double_cert(&self, dc: &DoubleCertificate) -> bool {
1565        if dc.inner_qc.block_hash != dc.outer_qc.block_hash {
1566            warn!("double cert inner/outer block_hash mismatch");
1567            return false;
1568        }
1569        let vs = &self.state.validator_set;
1570        let inner_bytes = Vote::signing_bytes(
1571            &self.state.chain_id_hash,
1572            dc.inner_qc.epoch,
1573            dc.inner_qc.view,
1574            &dc.inner_qc.block_hash,
1575            VoteType::Vote,
1576        );
1577        if !self
1578            .verifier
1579            .verify_aggregate(vs, &inner_bytes, &dc.inner_qc.aggregate_signature)
1580        {
1581            warn!("double cert inner QC signature invalid");
1582            return false;
1583        }
1584        if !hotmint_crypto::has_quorum(vs, &dc.inner_qc.aggregate_signature) {
1585            warn!("double cert inner QC below quorum threshold");
1586            return false;
1587        }
1588        let outer_bytes = Vote::signing_bytes(
1589            &self.state.chain_id_hash,
1590            dc.outer_qc.epoch,
1591            dc.outer_qc.view,
1592            &dc.outer_qc.block_hash,
1593            VoteType::Vote2,
1594        );
1595        if !self
1596            .verifier
1597            .verify_aggregate(vs, &outer_bytes, &dc.outer_qc.aggregate_signature)
1598        {
1599            warn!("double cert outer QC signature invalid");
1600            return false;
1601        }
1602        if !hotmint_crypto::has_quorum(vs, &dc.outer_qc.aggregate_signature) {
1603            warn!("double cert outer QC below quorum threshold");
1604            return false;
1605        }
1606        true
1607    }
1608
1609    fn persist_state(&mut self) {
1610        if let Some(p) = self.persistence.as_mut() {
1611            p.save_current_view(self.state.current_view);
1612            if let Some(ref qc) = self.state.locked_qc {
1613                p.save_locked_qc(qc);
1614            }
1615            if let Some(ref qc) = self.state.highest_qc {
1616                p.save_highest_qc(qc);
1617            }
1618            p.save_last_committed_height(self.state.last_committed_height);
1619            p.save_current_epoch(&self.state.current_epoch);
1620            p.save_last_app_hash(self.state.last_app_hash);
1621            p.flush();
1622        }
1623    }
1624
1625    async fn advance_view(&mut self, trigger: ViewEntryTrigger) {
1626        let new_view = match &trigger {
1627            ViewEntryTrigger::DoubleCert(_) => self.state.current_view.next(),
1628            ViewEntryTrigger::TimeoutCert(tc) => ViewNumber(tc.view.as_u64() + 1),
1629            ViewEntryTrigger::Genesis => ViewNumber(1),
1630        };
1631        self.advance_view_to(new_view, trigger).await;
1632    }
1633
1634    async fn advance_view_to(&mut self, new_view: ViewNumber, trigger: ViewEntryTrigger) {
1635        if new_view <= self.state.current_view {
1636            return;
1637        }
1638
1639        // Reset backoff on successful progress (DoubleCert path)
1640        let is_progress = matches!(&trigger, ViewEntryTrigger::DoubleCert(_));
1641
1642        // Capture vote extensions from DoubleCertificate for the next create_payload.
1643        if let ViewEntryTrigger::DoubleCert(ref dc) = trigger {
1644            self.state.pending_vote_extensions = dc.vote_extensions.clone();
1645        } else {
1646            self.state.pending_vote_extensions.clear();
1647        }
1648
1649        self.vote_collector.clear_view(self.state.current_view);
1650        self.vote_collector.prune_before(self.state.current_view);
1651        self.pacemaker.clear_view(self.state.current_view);
1652        self.pacemaker.prune_before(self.state.current_view);
1653        self.status_senders.clear();
1654        self.current_view_qc = None;
1655
1656        // Epoch transition: apply pending validator set change when we reach the
1657        // epoch's start_view. The start_view is set deterministically (commit_view + 2)
1658        // so all honest nodes apply the transition at the same view.
1659        if self
1660            .pending_epoch
1661            .as_ref()
1662            .is_some_and(|e| new_view >= e.start_view)
1663        {
1664            // SAFETY: we just verified `pending_epoch` is `Some` above.
1665            let Some(new_epoch) = self.pending_epoch.take() else {
1666                unreachable!("pending_epoch was Some in the condition check");
1667            };
1668            info!(
1669                validator = %self.state.validator_id,
1670                old_epoch = %self.state.current_epoch.number,
1671                new_epoch = %new_epoch.number,
1672                start_view = %new_epoch.start_view,
1673                validators = new_epoch.validator_set.validator_count(),
1674                "epoch transition"
1675            );
1676            // Report offline validators to the application before transitioning.
1677            let offline = self.liveness_tracker.offline_validators();
1678            if !offline.is_empty() {
1679                let evidence: Vec<OfflineEvidence> = offline
1680                    .iter()
1681                    .map(|&(validator, missed, total)| OfflineEvidence {
1682                        validator,
1683                        missed_commits: missed,
1684                        total_commits: total,
1685                        evidence_height: self.state.last_committed_height,
1686                    })
1687                    .collect();
1688                info!(
1689                    offline_count = evidence.len(),
1690                    epoch = %self.state.current_epoch.number,
1691                    "reporting offline validators"
1692                );
1693                if let Err(e) = self.app.on_offline_validators(&evidence) {
1694                    warn!(error = %e, "on_offline_validators callback failed");
1695                }
1696            }
1697            self.liveness_tracker.reset();
1698
1699            self.state.validator_set = new_epoch.validator_set.clone();
1700            self.state.current_epoch = new_epoch;
1701            // Notify network layer of the new validator set and epoch
1702            self.network
1703                .on_epoch_change(self.state.current_epoch.number, &self.state.validator_set);
1704            // Full clear: old votes/wishes are from the previous epoch's validator set
1705            self.vote_collector = VoteCollector::new();
1706            self.pacemaker = Pacemaker::with_config(self.pacemaker_config.clone());
1707        }
1708
1709        view_protocol::enter_view(
1710            &mut self.state,
1711            new_view,
1712            trigger,
1713            self.network.as_ref(),
1714            self.signer.as_ref(),
1715        );
1716
1717        if is_progress {
1718            self.pacemaker.reset_on_progress();
1719        } else {
1720            self.pacemaker.reset_timer();
1721        }
1722
1723        self.persist_state();
1724
1725        // If we're the leader, propose immediately.
1726        // Note: in a full implementation, the leader would collect StatusCerts
1727        // before proposing (status_senders quorum gate). Currently the immediate
1728        // propose path is required for liveness across epoch transitions where
1729        // cross-epoch verification complexity can stall status collection.
1730        if self.state.is_leader() && self.state.step == ViewStep::WaitingForStatus {
1731            self.try_propose().await;
1732        }
1733    }
1734}
1735
1736// ---------------------------------------------------------------------------
1737// Regression tests for sub-quorum certificate injection (R-29, R-32)
1738// ---------------------------------------------------------------------------
1739
1740#[cfg(test)]
1741mod tests {
1742    use super::*;
1743
1744    use std::sync::Arc;
1745
1746    use parking_lot::RwLock;
1747
1748    use hotmint_crypto::{Ed25519Signer, Ed25519Verifier};
1749    use hotmint_types::Signer as SignerTrait;
1750    use hotmint_types::certificate::QuorumCertificate;
1751    use hotmint_types::crypto::AggregateSignature;
1752    use hotmint_types::epoch::EpochNumber;
1753    use hotmint_types::validator::{ValidatorId, ValidatorInfo};
1754    use hotmint_types::vote::{Vote, VoteType};
1755    use tokio::sync::mpsc;
1756
1757    use crate::application::NoopApplication;
1758    use crate::network::NetworkSink;
1759    use crate::state::ConsensusState;
1760    use crate::store::MemoryBlockStore;
1761
1762    // Minimal no-op network for unit tests — messages are silently discarded.
1763    struct DevNullNetwork;
1764    impl NetworkSink for DevNullNetwork {
1765        fn broadcast(&self, _: ConsensusMessage) {}
1766        fn send_to(&self, _: ValidatorId, _: ConsensusMessage) {}
1767    }
1768
1769    /// Default chain_id_hash for tests — matches ConsensusState::new() which uses chain_id = "".
1770    fn test_chain_id_hash() -> [u8; 32] {
1771        *blake3::hash(b"").as_bytes()
1772    }
1773
1774    fn make_validator_set_4() -> (ValidatorSet, Vec<Ed25519Signer>) {
1775        let signers: Vec<Ed25519Signer> = (0..4)
1776            .map(|i| Ed25519Signer::generate(ValidatorId(i)))
1777            .collect();
1778        let infos: Vec<ValidatorInfo> = signers
1779            .iter()
1780            .map(|s| ValidatorInfo {
1781                id: s.validator_id(),
1782                public_key: s.public_key(),
1783                power: 1,
1784            })
1785            .collect();
1786        (ValidatorSet::new(infos), signers)
1787    }
1788
1789    fn make_test_engine(
1790        vid: ValidatorId,
1791        vs: ValidatorSet,
1792        signer: Ed25519Signer,
1793    ) -> (
1794        ConsensusEngine,
1795        mpsc::Sender<(Option<ValidatorId>, ConsensusMessage)>,
1796    ) {
1797        let (tx, rx) = mpsc::channel(64);
1798        let store = Arc::new(RwLock::new(
1799            Box::new(MemoryBlockStore::new()) as Box<dyn crate::store::BlockStore>
1800        ));
1801        let state = ConsensusState::new(vid, vs);
1802        let engine = ConsensusEngine::new(
1803            state,
1804            store,
1805            Box::new(DevNullNetwork),
1806            Box::new(NoopApplication),
1807            Box::new(signer),
1808            rx,
1809            EngineConfig {
1810                verifier: Box::new(Ed25519Verifier),
1811                pacemaker: None,
1812                persistence: None,
1813                evidence_store: None,
1814                wal: None,
1815            },
1816        );
1817        (engine, tx)
1818    }
1819
1820    // R-29 regression: a Propose message whose justify QC is signed by fewer than
1821    // 2f+1 validators must be rejected by verify_message().
1822    #[test]
1823    fn r29_propose_sub_quorum_justify_rejected_by_verify_message() {
1824        let (vs, signers) = make_validator_set_4();
1825        // Use a fresh signer for the engine; verify_message only needs the engine's
1826        // validator set and verifier, not its own signing key.
1827        let engine_signer = Ed25519Signer::generate(ValidatorId(0));
1828        let (engine, _tx) = make_test_engine(ValidatorId(0), vs.clone(), engine_signer);
1829
1830        // Build a justify QC signed by exactly 1 of 4 validators — below 2f+1 = 3.
1831        let chain_id_hash = test_chain_id_hash();
1832        let hash = BlockHash::GENESIS;
1833        let qc_view = ViewNumber::GENESIS;
1834        let vote_bytes = Vote::signing_bytes(
1835            &chain_id_hash,
1836            EpochNumber(0),
1837            qc_view,
1838            &hash,
1839            VoteType::Vote,
1840        );
1841        let mut agg = AggregateSignature::new(4);
1842        agg.add(1, SignerTrait::sign(&signers[1], &vote_bytes))
1843            .unwrap();
1844        let sub_quorum_qc = QuorumCertificate {
1845            block_hash: hash,
1846            view: qc_view,
1847            aggregate_signature: agg,
1848            epoch: EpochNumber(0),
1849        };
1850
1851        // Construct a proposal from V1 carrying this sub-quorum justify.
1852        let mut block = Block::genesis();
1853        block.height = Height(1);
1854        block.view = ViewNumber(1);
1855        block.proposer = ValidatorId(1);
1856        block.hash = block.compute_hash();
1857        let proposal_bytes = crate::view_protocol::proposal_signing_bytes(
1858            &chain_id_hash,
1859            EpochNumber(0),
1860            &block,
1861            &sub_quorum_qc,
1862        );
1863        let signature = SignerTrait::sign(&signers[1], &proposal_bytes);
1864
1865        let msg = ConsensusMessage::Propose {
1866            block: Box::new(block),
1867            justify: Box::new(sub_quorum_qc),
1868            double_cert: None,
1869            signature,
1870        };
1871
1872        assert!(
1873            !engine.verify_message(&msg),
1874            "R-29 regression: Propose with sub-quorum justify QC must be rejected by verify_message"
1875        );
1876    }
1877
1878    // R-29 regression: a Propose message with a full quorum justify QC (3/4) must pass.
1879    #[test]
1880    fn r29_propose_full_quorum_justify_accepted_by_verify_message() {
1881        let (vs, signers) = make_validator_set_4();
1882        let engine_signer = Ed25519Signer::generate(ValidatorId(0));
1883        let (engine, _tx) = make_test_engine(ValidatorId(0), vs.clone(), engine_signer);
1884
1885        let chain_id_hash = test_chain_id_hash();
1886        let hash = BlockHash::GENESIS;
1887        let qc_view = ViewNumber::GENESIS;
1888        let vote_bytes = Vote::signing_bytes(
1889            &chain_id_hash,
1890            EpochNumber(0),
1891            qc_view,
1892            &hash,
1893            VoteType::Vote,
1894        );
1895        // 3 of 4 signers — meets 2f+1 threshold.
1896        let mut agg = AggregateSignature::new(4);
1897        for (i, signer) in signers.iter().take(3).enumerate() {
1898            agg.add(i, SignerTrait::sign(signer, &vote_bytes)).unwrap();
1899        }
1900        let full_quorum_qc = QuorumCertificate {
1901            block_hash: hash,
1902            view: qc_view,
1903            aggregate_signature: agg,
1904            epoch: EpochNumber(0),
1905        };
1906
1907        let mut block = Block::genesis();
1908        block.height = Height(1);
1909        block.view = ViewNumber(1);
1910        block.proposer = ValidatorId(1);
1911        block.hash = block.compute_hash();
1912        let proposal_bytes = crate::view_protocol::proposal_signing_bytes(
1913            &chain_id_hash,
1914            EpochNumber(0),
1915            &block,
1916            &full_quorum_qc,
1917        );
1918        let signature = SignerTrait::sign(&signers[1], &proposal_bytes);
1919
1920        let msg = ConsensusMessage::Propose {
1921            block: Box::new(block),
1922            justify: Box::new(full_quorum_qc),
1923            double_cert: None,
1924            signature,
1925        };
1926
1927        assert!(
1928            engine.verify_message(&msg),
1929            "R-29: Propose with full quorum justify QC must pass verify_message"
1930        );
1931    }
1932
1933    // R-32 regression: a Wish carrying a sub-quorum highest_qc must cause
1934    // verify_highest_qc_in_wish to treat the QC as invalid and return false,
1935    // which causes handle_message to discard the Wish without forwarding it
1936    // to the pacemaker.
1937    //
1938    // We verify the sub-component: has_quorum returns false for a 1-of-4 aggregate,
1939    // ensuring the guard in handle_message fires.
1940    #[test]
1941    fn r32_sub_quorum_highest_qc_fails_has_quorum() {
1942        let (vs, signers) = make_validator_set_4();
1943
1944        let chain_id_hash = test_chain_id_hash();
1945        let hash = BlockHash([1u8; 32]);
1946        let qc_view = ViewNumber(1);
1947        let vote_bytes = Vote::signing_bytes(
1948            &chain_id_hash,
1949            EpochNumber(0),
1950            qc_view,
1951            &hash,
1952            VoteType::Vote,
1953        );
1954
1955        // Build a QC with only 1 signer — sub-quorum.
1956        let mut agg = AggregateSignature::new(4);
1957        agg.add(0, SignerTrait::sign(&signers[0], &vote_bytes))
1958            .unwrap();
1959
1960        assert!(
1961            !hotmint_crypto::has_quorum(&vs, &agg),
1962            "R-32 regression: 1-of-4 signed QC must not satisfy has_quorum"
1963        );
1964
1965        // Build a QC with 3 signers — full quorum.
1966        let mut agg_full = AggregateSignature::new(4);
1967        for (i, signer) in signers.iter().take(3).enumerate() {
1968            agg_full
1969                .add(i, SignerTrait::sign(signer, &vote_bytes))
1970                .unwrap();
1971        }
1972        assert!(
1973            hotmint_crypto::has_quorum(&vs, &agg_full),
1974            "R-32: 3-of-4 signed QC must satisfy has_quorum"
1975        );
1976    }
1977}