Skip to main content

hotmint_consensus/
engine.rs

1use ruc::*;
2
3use std::collections::{HashMap, HashSet};
4use std::sync::Arc;
5
6use tokio::sync::RwLock;
7
8use crate::application::Application;
9use crate::commit::try_commit;
10use crate::evidence_store::EvidenceStore;
11use crate::leader;
12use crate::network::NetworkSink;
13use crate::pacemaker::{Pacemaker, PacemakerConfig};
14use crate::state::{ConsensusState, ViewStep};
15use crate::store::BlockStore;
16use crate::view_protocol::{self, ViewEntryTrigger};
17use crate::vote_collector::VoteCollector;
18
19use hotmint_types::epoch::Epoch;
20use hotmint_types::vote::VoteType;
21use hotmint_types::*;
22use tokio::sync::mpsc;
23use tracing::{info, warn};
24
25/// Shared block store type used by the engine, RPC, and sync responder.
26pub type SharedBlockStore = Arc<RwLock<Box<dyn BlockStore>>>;
27
28/// Trait for persisting critical consensus state across restarts.
29pub trait StatePersistence: Send {
30    fn save_current_view(&mut self, view: ViewNumber);
31    fn save_locked_qc(&mut self, qc: &QuorumCertificate);
32    fn save_highest_qc(&mut self, qc: &QuorumCertificate);
33    fn save_last_committed_height(&mut self, height: Height);
34    fn save_current_epoch(&mut self, epoch: &Epoch);
35    fn save_last_app_hash(&mut self, hash: BlockHash);
36    fn flush(&self);
37}
38
39pub struct ConsensusEngine {
40    state: ConsensusState,
41    store: SharedBlockStore,
42    network: Box<dyn NetworkSink>,
43    app: Box<dyn Application>,
44    signer: Box<dyn Signer>,
45    verifier: Box<dyn Verifier>,
46    vote_collector: VoteCollector,
47    pacemaker: Pacemaker,
48    pacemaker_config: PacemakerConfig,
49    msg_rx: mpsc::Receiver<(Option<ValidatorId>, ConsensusMessage)>,
50    /// Collected unique status cert senders (for leader, per view)
51    status_senders: HashSet<ValidatorId>,
52    /// The QC formed in this view's first voting round (used to build DoubleCert)
53    current_view_qc: Option<QuorumCertificate>,
54    /// Pending epoch transition (set by try_commit, applied in advance_view_to)
55    pending_epoch: Option<Epoch>,
56    /// Optional state persistence (for crash recovery).
57    persistence: Option<Box<dyn StatePersistence>>,
58    /// Optional evidence store for persisting equivocation proofs.
59    evidence_store: Option<Box<dyn EvidenceStore>>,
60}
61
62/// Configuration for ConsensusEngine.
63pub struct EngineConfig {
64    pub verifier: Box<dyn Verifier>,
65    pub pacemaker: Option<PacemakerConfig>,
66    pub persistence: Option<Box<dyn StatePersistence>>,
67    pub evidence_store: Option<Box<dyn EvidenceStore>>,
68}
69
70impl EngineConfig {
71    /// Create an `EngineConfig` with the given verifier and defaults
72    /// (no custom pacemaker, no persistence, no evidence store).
73    pub fn new(verifier: Box<dyn Verifier>) -> Self {
74        Self {
75            verifier,
76            pacemaker: None,
77            persistence: None,
78            evidence_store: None,
79        }
80    }
81
82    /// Set a custom pacemaker configuration.
83    pub fn with_pacemaker(mut self, pacemaker: PacemakerConfig) -> Self {
84        self.pacemaker = Some(pacemaker);
85        self
86    }
87
88    /// Set a state persistence backend.
89    pub fn with_persistence(mut self, persistence: Box<dyn StatePersistence>) -> Self {
90        self.persistence = Some(persistence);
91        self
92    }
93}
94
95/// Builder for constructing a `ConsensusEngine` with a fluent API.
96///
97/// # Example
98/// ```rust,ignore
99/// let engine = ConsensusEngineBuilder::new()
100///     .state(state)
101///     .store(store)
102///     .network(network)
103///     .app(app)
104///     .signer(signer)
105///     .messages(msg_rx)
106///     .verifier(verifier)
107///     .build()
108///     .expect("all required fields must be set");
109/// ```
110pub struct ConsensusEngineBuilder {
111    state: Option<ConsensusState>,
112    store: Option<SharedBlockStore>,
113    network: Option<Box<dyn NetworkSink>>,
114    app: Option<Box<dyn Application>>,
115    signer: Option<Box<dyn Signer>>,
116    msg_rx: Option<mpsc::Receiver<(Option<ValidatorId>, ConsensusMessage)>>,
117    verifier: Option<Box<dyn Verifier>>,
118    pacemaker: Option<PacemakerConfig>,
119    persistence: Option<Box<dyn StatePersistence>>,
120    evidence_store: Option<Box<dyn EvidenceStore>>,
121}
122
123impl ConsensusEngineBuilder {
124    pub fn new() -> Self {
125        Self {
126            state: None,
127            store: None,
128            network: None,
129            app: None,
130            signer: None,
131            msg_rx: None,
132            verifier: None,
133            pacemaker: None,
134            persistence: None,
135            evidence_store: None,
136        }
137    }
138
139    pub fn state(mut self, state: ConsensusState) -> Self {
140        self.state = Some(state);
141        self
142    }
143
144    pub fn store(mut self, store: SharedBlockStore) -> Self {
145        self.store = Some(store);
146        self
147    }
148
149    pub fn network(mut self, network: Box<dyn NetworkSink>) -> Self {
150        self.network = Some(network);
151        self
152    }
153
154    pub fn app(mut self, app: Box<dyn Application>) -> Self {
155        self.app = Some(app);
156        self
157    }
158
159    pub fn signer(mut self, signer: Box<dyn Signer>) -> Self {
160        self.signer = Some(signer);
161        self
162    }
163
164    pub fn messages(
165        mut self,
166        msg_rx: mpsc::Receiver<(Option<ValidatorId>, ConsensusMessage)>,
167    ) -> Self {
168        self.msg_rx = Some(msg_rx);
169        self
170    }
171
172    pub fn verifier(mut self, verifier: Box<dyn Verifier>) -> Self {
173        self.verifier = Some(verifier);
174        self
175    }
176
177    pub fn pacemaker(mut self, config: PacemakerConfig) -> Self {
178        self.pacemaker = Some(config);
179        self
180    }
181
182    pub fn persistence(mut self, persistence: Box<dyn StatePersistence>) -> Self {
183        self.persistence = Some(persistence);
184        self
185    }
186
187    pub fn evidence_store(mut self, store: Box<dyn EvidenceStore>) -> Self {
188        self.evidence_store = Some(store);
189        self
190    }
191
192    pub fn build(self) -> ruc::Result<ConsensusEngine> {
193        let state = self.state.ok_or_else(|| ruc::eg!("state is required"))?;
194        let store = self.store.ok_or_else(|| ruc::eg!("store is required"))?;
195        let network = self
196            .network
197            .ok_or_else(|| ruc::eg!("network is required"))?;
198        let app = self.app.ok_or_else(|| ruc::eg!("app is required"))?;
199        let signer = self.signer.ok_or_else(|| ruc::eg!("signer is required"))?;
200        let msg_rx = self
201            .msg_rx
202            .ok_or_else(|| ruc::eg!("messages (msg_rx) is required"))?;
203        let verifier = self
204            .verifier
205            .ok_or_else(|| ruc::eg!("verifier is required"))?;
206
207        let config = EngineConfig {
208            verifier,
209            pacemaker: self.pacemaker,
210            persistence: self.persistence,
211            evidence_store: self.evidence_store,
212        };
213
214        Ok(ConsensusEngine::new(
215            state, store, network, app, signer, msg_rx, config,
216        ))
217    }
218}
219
220impl Default for ConsensusEngineBuilder {
221    fn default() -> Self {
222        Self::new()
223    }
224}
225
226impl ConsensusEngine {
227    pub fn new(
228        state: ConsensusState,
229        store: SharedBlockStore,
230        network: Box<dyn NetworkSink>,
231        app: Box<dyn Application>,
232        signer: Box<dyn Signer>,
233        msg_rx: mpsc::Receiver<(Option<ValidatorId>, ConsensusMessage)>,
234        config: EngineConfig,
235    ) -> Self {
236        let pc = config.pacemaker.unwrap_or_default();
237        Self {
238            state,
239            store,
240            network,
241            app,
242            signer,
243            verifier: config.verifier,
244            vote_collector: VoteCollector::new(),
245            pacemaker: Pacemaker::with_config(pc.clone()),
246            pacemaker_config: pc,
247            msg_rx,
248            status_senders: HashSet::new(),
249            current_view_qc: None,
250            pending_epoch: None,
251            persistence: config.persistence,
252            evidence_store: config.evidence_store,
253        }
254    }
255
256    /// Bootstrap and start the event loop.
257    /// If persisted state was restored (current_view > 1), skip genesis bootstrap.
258    pub async fn run(mut self) {
259        if self.state.current_view.as_u64() <= 1 {
260            self.enter_genesis_view().await;
261        } else {
262            info!(
263                validator = %self.state.validator_id,
264                view = %self.state.current_view,
265                height = %self.state.last_committed_height,
266                "resuming from persisted state"
267            );
268            self.pacemaker.reset_timer();
269        }
270
271        loop {
272            let deadline = self.pacemaker.sleep_until_deadline();
273            tokio::pin!(deadline);
274
275            tokio::select! {
276                Some((sender, msg)) = self.msg_rx.recv() => {
277                    if let Err(e) = self.handle_message(sender, msg).await {
278                        warn!(validator = %self.state.validator_id, error = %e, "error handling message");
279                    }
280                }
281                _ = &mut deadline => {
282                    self.handle_timeout().await;
283                }
284            }
285        }
286    }
287
288    async fn enter_genesis_view(&mut self) {
289        // Create a synthetic genesis QC so the first leader can propose
290        let genesis_qc = QuorumCertificate {
291            block_hash: BlockHash::GENESIS,
292            view: ViewNumber::GENESIS,
293            aggregate_signature: AggregateSignature::new(
294                self.state.validator_set.validator_count(),
295            ),
296            epoch: self.state.current_epoch.number,
297        };
298        self.state.highest_qc = Some(genesis_qc);
299
300        let view = ViewNumber(1);
301        view_protocol::enter_view(
302            &mut self.state,
303            view,
304            ViewEntryTrigger::Genesis,
305            self.network.as_ref(),
306            self.signer.as_ref(),
307        );
308        self.pacemaker.reset_timer();
309
310        // If leader of genesis view, propose immediately
311        if self.state.is_leader() {
312            self.state.step = ViewStep::WaitingForStatus;
313            // In genesis, skip status wait — propose directly
314            self.try_propose().await;
315        }
316    }
317
318    fn try_propose(
319        &mut self,
320    ) -> std::pin::Pin<Box<dyn std::future::Future<Output = ()> + Send + '_>> {
321        Box::pin(async move {
322            // Propose is synchronous; acquire, run, and release the lock before any await.
323            let proposed_block = {
324                let mut store = self.store.write().await;
325                view_protocol::propose(
326                    &mut self.state,
327                    store.as_mut(),
328                    self.network.as_ref(),
329                    self.app.as_ref(),
330                    self.signer.as_ref(),
331                )
332            }; // lock released here
333
334            match proposed_block {
335                Ok(block) => {
336                    // Log pending evidence (full block inclusion is a later step)
337                    if let Some(ref store) = self.evidence_store {
338                        let pending = store.get_pending();
339                        if !pending.is_empty() {
340                            info!(
341                                validator = %self.state.validator_id,
342                                count = pending.len(),
343                                "pending equivocation evidence available for inclusion"
344                            );
345                        }
346                    }
347                    // Leader votes for its own block
348                    self.leader_self_vote(block.hash).await;
349                }
350                Err(e) => {
351                    warn!(
352                        validator = %self.state.validator_id,
353                        error = %e,
354                        "failed to propose"
355                    );
356                }
357            }
358        })
359    }
360
361    async fn leader_self_vote(&mut self, block_hash: BlockHash) {
362        let vote_bytes = Vote::signing_bytes(
363            &self.state.chain_id_hash,
364            self.state.current_epoch.number,
365            self.state.current_view,
366            &block_hash,
367            VoteType::Vote,
368        );
369        let signature = self.signer.sign(&vote_bytes);
370        let vote = Vote {
371            block_hash,
372            view: self.state.current_view,
373            validator: self.state.validator_id,
374            signature,
375            vote_type: VoteType::Vote,
376            extension: None,
377        };
378        match self.vote_collector.add_vote(
379            &self.state.validator_set,
380            vote,
381            self.state.current_epoch.number,
382        ) {
383            Ok(result) => {
384                self.handle_equivocation(&result);
385                if let Some(qc) = result.qc {
386                    self.on_qc_formed(qc).await;
387                }
388            }
389            Err(e) => warn!(error = %e, "failed to add self vote"),
390        }
391    }
392}
393
394/// Verify the per-sender individual signature on a consensus message before relaying.
395///
396/// Only checks signatures that can be attributed to a single known validator using the
397/// provided key map.  Aggregate certificates (TimeoutCert, justify/prepare QCs) are
398/// intentionally not fully re-verified here — the receiving engine always does the full
399/// check.  Messages whose signing bytes depend on receiver state (StatusCert needs
400/// `current_view`) are also allowed through; the engine will reject them if invalid.
401///
402/// `ordered_validators` is the validator list in round-robin order (same order used by
403/// `ValidatorSet::leader_for_view`).  Pass an empty slice to skip the leader-for-view
404/// check (e.g., in tests where the set is not available).
405///
406/// Returns `false` when:
407/// - The claimed sender is not in `validator_keys` (unknown/non-validator peer), OR
408/// - The individual signature is cryptographically invalid, OR
409/// - For Prepare: the sender is not the leader for the certificate's view.
410pub fn verify_relay_sender(
411    sender: ValidatorId,
412    msg: &ConsensusMessage,
413    validator_keys: &HashMap<ValidatorId, hotmint_types::crypto::PublicKey>,
414    ordered_validators: &[ValidatorId],
415    chain_id_hash: &[u8; 32],
416    epoch: hotmint_types::epoch::EpochNumber,
417) -> bool {
418    use hotmint_crypto::Ed25519Verifier;
419    use hotmint_types::Verifier;
420    use hotmint_types::vote::Vote;
421    let verifier = Ed25519Verifier;
422    match msg {
423        ConsensusMessage::Propose {
424            block,
425            justify,
426            signature,
427            ..
428        } => {
429            let Some(pk) = validator_keys.get(&block.proposer) else {
430                return false;
431            };
432            let bytes =
433                crate::view_protocol::proposal_signing_bytes(chain_id_hash, epoch, block, justify);
434            Verifier::verify(&verifier, pk, &bytes, signature)
435        }
436        ConsensusMessage::VoteMsg(vote) | ConsensusMessage::Vote2Msg(vote) => {
437            let Some(pk) = validator_keys.get(&vote.validator) else {
438                return false;
439            };
440            let bytes = Vote::signing_bytes(
441                chain_id_hash,
442                epoch,
443                vote.view,
444                &vote.block_hash,
445                vote.vote_type,
446            );
447            Verifier::verify(&verifier, pk, &bytes, &vote.signature)
448        }
449        ConsensusMessage::Prepare {
450            certificate,
451            signature,
452        } => {
453            // Prepare is broadcast by the current leader. Verify that the relay
454            // sender is the leader for this view, then check the signature.
455            if !ordered_validators.is_empty() {
456                let n = ordered_validators.len();
457                let expected_leader = ordered_validators[certificate.view.as_u64() as usize % n];
458                if sender != expected_leader {
459                    return false;
460                }
461            }
462            let Some(pk) = validator_keys.get(&sender) else {
463                return false;
464            };
465            let bytes =
466                crate::view_protocol::prepare_signing_bytes(chain_id_hash, epoch, certificate);
467            Verifier::verify(&verifier, pk, &bytes, signature)
468        }
469        ConsensusMessage::Wish {
470            target_view,
471            validator,
472            highest_qc,
473            signature,
474        } => {
475            let Some(pk) = validator_keys.get(validator) else {
476                return false;
477            };
478            let bytes = crate::pacemaker::wish_signing_bytes(
479                chain_id_hash,
480                epoch,
481                *target_view,
482                highest_qc.as_ref(),
483            );
484            Verifier::verify(&verifier, pk, &bytes, signature)
485        }
486        ConsensusMessage::TimeoutCert(tc) => {
487            // Full relay verification: verify each signer's signature + quorum check.
488            let target_view = ViewNumber(tc.view.as_u64() + 1);
489            let n = ordered_validators.len();
490            if n == 0 || tc.aggregate_signature.signers.len() != n {
491                return false;
492            }
493            let mut sig_idx = 0usize;
494            let mut verified_count = 0usize;
495            for (i, &signed) in tc.aggregate_signature.signers.iter().enumerate() {
496                if !signed {
497                    continue;
498                }
499                if i >= n {
500                    return false;
501                }
502                let vid = ordered_validators[i];
503                let Some(pk) = validator_keys.get(&vid) else {
504                    return false;
505                };
506                let hqc = tc.highest_qcs.get(i).and_then(|h| h.as_ref());
507                let bytes =
508                    crate::pacemaker::wish_signing_bytes(chain_id_hash, epoch, target_view, hqc);
509                if sig_idx >= tc.aggregate_signature.signatures.len() {
510                    return false;
511                }
512                if !Verifier::verify(
513                    &verifier,
514                    pk,
515                    &bytes,
516                    &tc.aggregate_signature.signatures[sig_idx],
517                ) {
518                    return false;
519                }
520                sig_idx += 1;
521                verified_count += 1;
522            }
523            if sig_idx != tc.aggregate_signature.signatures.len() {
524                return false;
525            }
526            // Quorum check: require > 2/3 of validators (by count, not power —
527            // full power-based check is done in engine::verify_message).
528            verified_count * 3 > n * 2
529        }
530        ConsensusMessage::StatusCert {
531            validator,
532            signature,
533            locked_qc,
534            ..
535        } => {
536            // StatusCert signing bytes require current_view which we don't have
537            // in the relay context. Verify the sender is a known validator and
538            // the signature is over *some* plausible view (the TC view is not
539            // available here). The engine does full verification with correct view.
540            // At minimum, reject unknown validators.
541            let Some(pk) = validator_keys.get(validator) else {
542                return false;
543            };
544            // We cannot construct exact signing bytes without knowing current_view,
545            // so we accept from known validators only. The engine's verify_message
546            // will do full cryptographic verification.
547            let _ = (pk, signature, locked_qc);
548            true
549        }
550        ConsensusMessage::Evidence(_) => {
551            // Evidence gossip: accept from any known validator.
552            // The engine verifies the proof internally.
553            validator_keys.contains_key(&sender)
554        }
555    }
556}
557
558impl ConsensusEngine {
559    /// Epoch numbers to try when verifying signatures. During epoch transitions,
560    /// some nodes may still be in the previous epoch. We try the current epoch
561    /// first, then fall back to epoch - 1 to tolerate the transition window.
562    fn verification_epochs(&self) -> [EpochNumber; 2] {
563        let cur = self.state.current_epoch.number;
564        let prev = if cur.as_u64() > 0 {
565            EpochNumber(cur.as_u64() - 1)
566        } else {
567            cur
568        };
569        [cur, prev]
570    }
571
572    /// Verify the cryptographic signature on an inbound consensus message.
573    /// Returns false (and logs a warning) if verification fails.
574    /// Messages from past views are skipped (they'll be dropped by handle_message anyway).
575    ///
576    /// Crypto-heavy paths (aggregate signature verification) are run via
577    /// `tokio::task::block_in_place` so the async event loop remains responsive
578    /// while Ed25519 batch verification runs on the current OS thread.
579    fn verify_message(&self, msg: &ConsensusMessage) -> bool {
580        // Skip verification for non-Propose past-view messages — these may have
581        // been signed by a previous epoch's validator set. They'll be dropped by
582        // view checks. Propose messages are always verified because they may still
583        // be stored (for chain continuity in fast-forward).
584        let msg_view = match msg {
585            ConsensusMessage::Propose { .. } => None, // always verify proposals
586            ConsensusMessage::VoteMsg(v) | ConsensusMessage::Vote2Msg(v) => Some(v.view),
587            ConsensusMessage::Prepare { certificate, .. } => Some(certificate.view),
588            ConsensusMessage::Wish { target_view, .. } => Some(*target_view),
589            ConsensusMessage::TimeoutCert(tc) => Some(ViewNumber(tc.view.as_u64() + 1)),
590            ConsensusMessage::StatusCert { .. } => None,
591            ConsensusMessage::Evidence(_) => None, // always accept evidence
592        };
593        if let Some(v) = msg_view
594            && v < self.state.current_view
595        {
596            return true; // will be dropped by handler
597        }
598
599        let vs = &self.state.validator_set;
600        match msg {
601            ConsensusMessage::Propose {
602                block,
603                justify,
604                signature,
605                ..
606            } => {
607                let proposer = vs.get(block.proposer);
608                let Some(vi) = proposer else {
609                    warn!(proposer = %block.proposer, "propose from unknown validator");
610                    return false;
611                };
612                let mut proposal_ok = false;
613                for epoch in self.verification_epochs() {
614                    let bytes = view_protocol::proposal_signing_bytes(
615                        &self.state.chain_id_hash,
616                        epoch,
617                        block,
618                        justify,
619                    );
620                    if self.verifier.verify(&vi.public_key, &bytes, signature) {
621                        proposal_ok = true;
622                        break;
623                    }
624                }
625                if !proposal_ok {
626                    warn!(proposer = %block.proposer, "invalid proposal signature");
627                    return false;
628                }
629                // Verify justify QC aggregate signature (skip genesis QC which has no signers)
630                if justify.aggregate_signature.count() > 0 {
631                    let qc_bytes = Vote::signing_bytes(
632                        &self.state.chain_id_hash,
633                        justify.epoch,
634                        justify.view,
635                        &justify.block_hash,
636                        VoteType::Vote,
637                    );
638                    if !self
639                        .verifier
640                        .verify_aggregate(vs, &qc_bytes, &justify.aggregate_signature)
641                    {
642                        warn!(proposer = %block.proposer, "invalid justify QC aggregate signature");
643                        return false;
644                    }
645                    if !hotmint_crypto::has_quorum(vs, &justify.aggregate_signature) {
646                        warn!(proposer = %block.proposer, "justify QC below quorum threshold");
647                        return false;
648                    }
649                }
650                true
651            }
652            ConsensusMessage::VoteMsg(vote) | ConsensusMessage::Vote2Msg(vote) => {
653                let Some(vi) = vs.get(vote.validator) else {
654                    warn!(validator = %vote.validator, "vote from unknown validator");
655                    return false;
656                };
657                let mut ok = false;
658                for epoch in self.verification_epochs() {
659                    let bytes = Vote::signing_bytes(
660                        &self.state.chain_id_hash,
661                        epoch,
662                        vote.view,
663                        &vote.block_hash,
664                        vote.vote_type,
665                    );
666                    if self
667                        .verifier
668                        .verify(&vi.public_key, &bytes, &vote.signature)
669                    {
670                        ok = true;
671                        break;
672                    }
673                }
674                if !ok {
675                    warn!(validator = %vote.validator, "invalid vote signature");
676                    return false;
677                }
678                true
679            }
680            ConsensusMessage::Prepare {
681                certificate,
682                signature,
683            } => {
684                // Verify the leader's signature on the prepare message
685                let Some(leader) = vs.leader_for_view(certificate.view) else {
686                    return false;
687                };
688                let mut prepare_ok = false;
689                for epoch in self.verification_epochs() {
690                    let bytes = view_protocol::prepare_signing_bytes(
691                        &self.state.chain_id_hash,
692                        epoch,
693                        certificate,
694                    );
695                    if self.verifier.verify(&leader.public_key, &bytes, signature) {
696                        prepare_ok = true;
697                        break;
698                    }
699                }
700                if !prepare_ok {
701                    warn!(view = %certificate.view, "invalid prepare signature");
702                    return false;
703                }
704                // Also verify the QC's aggregate signature and quorum
705                let qc_bytes = Vote::signing_bytes(
706                    &self.state.chain_id_hash,
707                    certificate.epoch,
708                    certificate.view,
709                    &certificate.block_hash,
710                    VoteType::Vote,
711                );
712                if !self
713                    .verifier
714                    .verify_aggregate(vs, &qc_bytes, &certificate.aggregate_signature)
715                {
716                    warn!(view = %certificate.view, "invalid QC aggregate signature");
717                    return false;
718                }
719                if !hotmint_crypto::has_quorum(vs, &certificate.aggregate_signature) {
720                    warn!(view = %certificate.view, "Prepare QC below quorum threshold");
721                    return false;
722                }
723                true
724            }
725            ConsensusMessage::Wish {
726                target_view,
727                validator,
728                highest_qc,
729                signature,
730            } => {
731                let Some(vi) = vs.get(*validator) else {
732                    warn!(validator = %validator, "wish from unknown validator");
733                    return false;
734                };
735                // Signing bytes bind both target_view and highest_qc to prevent replay.
736                let mut wish_ok = false;
737                for epoch in self.verification_epochs() {
738                    let bytes = crate::pacemaker::wish_signing_bytes(
739                        &self.state.chain_id_hash,
740                        epoch,
741                        *target_view,
742                        highest_qc.as_ref(),
743                    );
744                    if self.verifier.verify(&vi.public_key, &bytes, signature) {
745                        wish_ok = true;
746                        break;
747                    }
748                }
749                if !wish_ok {
750                    warn!(validator = %validator, "invalid wish signature");
751                    return false;
752                }
753                true
754            }
755            ConsensusMessage::TimeoutCert(tc) => {
756                // The TC's aggregate signature is a collection of individual Ed25519 signatures,
757                // each signed over wish_signing_bytes(target_view, signer_highest_qc).
758                // Because each validator may have a different highest_qc, we verify per-signer
759                // using tc.highest_qcs[i] (indexed by validator slot).
760                // This also enforces quorum: we sum voting power of verified signers.
761                let target_view = ViewNumber(tc.view.as_u64() + 1);
762                let n = vs.validator_count();
763                if tc.aggregate_signature.signers.len() != n {
764                    warn!(view = %tc.view, "TC signers bitfield length mismatch");
765                    return false;
766                }
767                let mut sig_idx = 0usize;
768                let mut power = 0u64;
769                for (i, &signed) in tc.aggregate_signature.signers.iter().enumerate() {
770                    if !signed {
771                        continue;
772                    }
773                    let Some(vi) = vs.validators().get(i) else {
774                        warn!(view = %tc.view, validator_idx = i, "TC signer index out of validator set");
775                        return false;
776                    };
777                    let hqc = tc.highest_qcs.get(i).and_then(|h| h.as_ref());
778                    if sig_idx >= tc.aggregate_signature.signatures.len() {
779                        warn!(view = %tc.view, "TC aggregate_signature has fewer sigs than signers");
780                        return false;
781                    }
782                    let mut tc_sig_ok = false;
783                    for epoch in self.verification_epochs() {
784                        let bytes = crate::pacemaker::wish_signing_bytes(
785                            &self.state.chain_id_hash,
786                            epoch,
787                            target_view,
788                            hqc,
789                        );
790                        if self.verifier.verify(
791                            &vi.public_key,
792                            &bytes,
793                            &tc.aggregate_signature.signatures[sig_idx],
794                        ) {
795                            tc_sig_ok = true;
796                            break;
797                        }
798                    }
799                    if !tc_sig_ok {
800                        warn!(view = %tc.view, validator = %vi.id, "TC signer signature invalid");
801                        return false;
802                    }
803                    power += vs.power_of(vi.id);
804                    sig_idx += 1;
805                }
806                if sig_idx != tc.aggregate_signature.signatures.len() {
807                    warn!(view = %tc.view, "TC has extra signatures beyond bitfield");
808                    return false;
809                }
810                if power < vs.quorum_threshold() {
811                    warn!(view = %tc.view, power, threshold = vs.quorum_threshold(), "TC insufficient quorum");
812                    return false;
813                }
814                true
815            }
816            ConsensusMessage::StatusCert {
817                locked_qc,
818                validator,
819                signature,
820            } => {
821                let Some(vi) = vs.get(*validator) else {
822                    warn!(validator = %validator, "status from unknown validator");
823                    return false;
824                };
825                let mut status_ok = false;
826                for epoch in self.verification_epochs() {
827                    let bytes = view_protocol::status_signing_bytes(
828                        &self.state.chain_id_hash,
829                        epoch,
830                        self.state.current_view,
831                        locked_qc,
832                    );
833                    if self.verifier.verify(&vi.public_key, &bytes, signature) {
834                        status_ok = true;
835                        break;
836                    }
837                }
838                if !status_ok {
839                    warn!(validator = %validator, "invalid status signature");
840                    return false;
841                }
842                true
843            }
844            ConsensusMessage::Evidence(_) => {
845                // Evidence gossip does not carry an outer signature;
846                // the proof itself contains the conflicting vote signatures
847                // which are verified by the application layer.
848                true
849            }
850        }
851    }
852
853    async fn handle_message(
854        &mut self,
855        _sender: Option<ValidatorId>,
856        msg: ConsensusMessage,
857    ) -> Result<()> {
858        // Run signature verification in a blocking context so that the tokio
859        // event loop is not stalled by CPU-intensive Ed25519 batch operations.
860        // block_in_place yields the current thread to the scheduler while the
861        // blocking work runs, keeping timers and I/O tasks responsive.
862        let verified = tokio::task::block_in_place(|| self.verify_message(&msg));
863        if !verified {
864            return Ok(());
865        }
866
867        match msg {
868            ConsensusMessage::Propose {
869                block,
870                justify,
871                double_cert,
872                signature: _,
873            } => {
874                let block = *block;
875                let justify = *justify;
876                let double_cert = double_cert.map(|dc| *dc);
877
878                // If proposal is from a future view, advance to it first
879                if block.view > self.state.current_view {
880                    if let Some(ref dc) = double_cert {
881                        if !tokio::task::block_in_place(|| self.validate_double_cert(dc)) {
882                            return Ok(());
883                        }
884
885                        // Fast-forward via double cert
886                        self.apply_commit(dc, "fast-forward").await;
887                        self.state.highest_double_cert = Some(dc.clone());
888                        self.advance_view_to(block.view, ViewEntryTrigger::DoubleCert(dc.clone()))
889                            .await;
890                    } else {
891                        return Ok(());
892                    }
893                } else if block.view < self.state.current_view {
894                    // Still store blocks from past views if we haven't committed
895                    // that height yet. This handles the case where fast-forward
896                    // advanced our view but we missed storing the block from the
897                    // earlier proposal. Without this, chain commits that walk
898                    // the parent chain would fail with "block not found".
899                    if block.height > self.state.last_committed_height {
900                        // Verify block hash before storing past-view blocks
901                        let expected = hotmint_crypto::compute_block_hash(&block);
902                        if block.hash == expected {
903                            let mut store = self.store.write().await;
904                            store.put_block(block);
905                        }
906                    }
907                    return Ok(());
908                }
909
910                let mut store = self.store.write().await;
911
912                // R-25: verify any DoubleCert in the same-view proposal path.
913                // The future-view path already calls validate_double_cert; the same-view path
914                // passes the DC straight to on_proposal → try_commit without verification.
915                // A Byzantine leader could inject a forged DC to trigger incorrect commits.
916                if let Some(ref dc) = double_cert
917                    && !tokio::task::block_in_place(|| self.validate_double_cert(dc))
918                {
919                    return Ok(());
920                }
921
922                // R-28: persist justify QC as commit evidence for the block it certifies.
923                // When blocks are committed via the 2-chain rule (possibly multiple blocks at
924                // once), the innermost block gets its own commit QC, but ancestor blocks only
925                // get the chain-rule commit and have no stored QC.  Storing the justify QC here
926                // ensures that sync responders can later serve those ancestor blocks with proof.
927                if justify.aggregate_signature.count() > 0
928                    && let Some(justified_block) = store.get_block(&justify.block_hash)
929                    && store.get_commit_qc(justified_block.height).is_none()
930                {
931                    store.put_commit_qc(justified_block.height, justify.clone());
932                }
933
934                let maybe_pending = view_protocol::on_proposal(
935                    &mut self.state,
936                    view_protocol::ProposalData {
937                        block,
938                        justify,
939                        double_cert,
940                    },
941                    store.as_mut(),
942                    self.network.as_ref(),
943                    self.app.as_ref(),
944                    self.signer.as_ref(),
945                )
946                .c(d!())?;
947                drop(store);
948
949                if let Some(epoch) = maybe_pending {
950                    self.pending_epoch = Some(epoch);
951                }
952            }
953
954            ConsensusMessage::VoteMsg(vote) => {
955                if vote.view != self.state.current_view {
956                    return Ok(());
957                }
958                if !self.state.is_leader() {
959                    return Ok(());
960                }
961                if vote.vote_type != VoteType::Vote {
962                    return Ok(());
963                }
964
965                let result = self
966                    .vote_collector
967                    .add_vote(
968                        &self.state.validator_set,
969                        vote,
970                        self.state.current_epoch.number,
971                    )
972                    .c(d!())?;
973                self.handle_equivocation(&result);
974                if let Some(qc) = result.qc {
975                    self.on_qc_formed(qc).await;
976                }
977            }
978
979            ConsensusMessage::Prepare {
980                certificate,
981                signature: _,
982            } => {
983                if certificate.view < self.state.current_view {
984                    return Ok(());
985                }
986                if certificate.view == self.state.current_view {
987                    // Validate the Prepare's block app_hash if we have the block in
988                    // store. Prevents locking onto a block whose app_hash diverges from
989                    // our local state. When the block is absent (node caught up via TC),
990                    // we defer to the QC's 2f+1 signatures for safety.
991                    let store = self.store.read().await;
992                    let block_opt = store.get_block(&certificate.block_hash);
993                    if self.app.tracks_app_hash()
994                        && let Some(ref block) = block_opt
995                        && block.app_hash != self.state.last_app_hash
996                    {
997                        warn!(
998                            block_app_hash = %block.app_hash,
999                            local_app_hash = %self.state.last_app_hash,
1000                            "prepare block app_hash mismatch, ignoring"
1001                        );
1002                        return Ok(());
1003                    }
1004
1005                    // Generate vote extension for Vote2 (ABCI++ Vote Extensions).
1006                    // Only if we have the block available and have voting power.
1007                    let vote_extension = block_opt.and_then(|block| {
1008                        let ctx = BlockContext {
1009                            height: block.height,
1010                            view: self.state.current_view,
1011                            proposer: block.proposer,
1012                            epoch: self.state.current_epoch.number,
1013                            epoch_start_view: self.state.current_epoch.start_view,
1014                            validator_set: &self.state.validator_set,
1015                        };
1016                        self.app.extend_vote(&block, &ctx)
1017                    });
1018                    drop(store);
1019
1020                    view_protocol::on_prepare(
1021                        &mut self.state,
1022                        certificate,
1023                        self.network.as_ref(),
1024                        self.signer.as_ref(),
1025                        vote_extension,
1026                    );
1027                }
1028            }
1029
1030            ConsensusMessage::Vote2Msg(vote) => {
1031                if vote.view != self.state.current_view {
1032                    return Ok(());
1033                }
1034                if vote.vote_type != VoteType::Vote2 {
1035                    return Ok(());
1036                }
1037
1038                // Verify vote extension (ABCI++ Vote Extensions) if present.
1039                if let Some(ref ext) = vote.extension
1040                    && !self
1041                        .app
1042                        .verify_vote_extension(ext, &vote.block_hash, vote.validator)
1043                {
1044                    warn!(
1045                        validator = %vote.validator,
1046                        view = %vote.view,
1047                        "rejecting vote2: invalid vote extension"
1048                    );
1049                    return Ok(());
1050                }
1051
1052                let result = self
1053                    .vote_collector
1054                    .add_vote(
1055                        &self.state.validator_set,
1056                        vote,
1057                        self.state.current_epoch.number,
1058                    )
1059                    .c(d!())?;
1060                self.handle_equivocation(&result);
1061                if let Some(outer_qc) = result.qc {
1062                    self.on_double_cert_formed(outer_qc).await;
1063                }
1064            }
1065
1066            ConsensusMessage::Wish {
1067                target_view,
1068                validator,
1069                highest_qc,
1070                signature,
1071            } => {
1072                // Validate carried highest_qc (C4 mitigation).
1073                // Both signature authenticity and 2f+1 quorum weight must pass.
1074                if let Some(ref qc) = highest_qc
1075                    && qc.aggregate_signature.count() > 0
1076                {
1077                    let qc_bytes = Vote::signing_bytes(
1078                        &self.state.chain_id_hash,
1079                        qc.epoch,
1080                        qc.view,
1081                        &qc.block_hash,
1082                        VoteType::Vote,
1083                    );
1084                    if !tokio::task::block_in_place(|| {
1085                        self.verifier.verify_aggregate(
1086                            &self.state.validator_set,
1087                            &qc_bytes,
1088                            &qc.aggregate_signature,
1089                        )
1090                    }) {
1091                        warn!(validator = %validator, "wish carries invalid highest_qc signature");
1092                        return Ok(());
1093                    }
1094                    if !hotmint_crypto::has_quorum(
1095                        &self.state.validator_set,
1096                        &qc.aggregate_signature,
1097                    ) {
1098                        warn!(validator = %validator, "wish carries highest_qc without quorum");
1099                        return Ok(());
1100                    }
1101                }
1102
1103                if let Some(tc) = self.pacemaker.add_wish(
1104                    &self.state.validator_set,
1105                    target_view,
1106                    validator,
1107                    highest_qc,
1108                    signature,
1109                ) {
1110                    info!(
1111                        validator = %self.state.validator_id,
1112                        view = %tc.view,
1113                        "TC formed, advancing view"
1114                    );
1115                    self.network
1116                        .broadcast(ConsensusMessage::TimeoutCert(tc.clone()));
1117                    self.advance_view(ViewEntryTrigger::TimeoutCert(tc)).await;
1118                }
1119            }
1120
1121            ConsensusMessage::TimeoutCert(tc) => {
1122                if self.pacemaker.should_relay_tc(&tc) {
1123                    self.network
1124                        .broadcast(ConsensusMessage::TimeoutCert(tc.clone()));
1125                }
1126                let new_view = ViewNumber(tc.view.as_u64() + 1);
1127                if new_view > self.state.current_view {
1128                    self.advance_view(ViewEntryTrigger::TimeoutCert(tc)).await;
1129                }
1130            }
1131
1132            ConsensusMessage::StatusCert {
1133                locked_qc,
1134                validator,
1135                signature: _,
1136            } => {
1137                if self.state.is_leader() && self.state.step == ViewStep::WaitingForStatus {
1138                    if let Some(ref qc) = locked_qc {
1139                        self.state.update_highest_qc(qc);
1140                    }
1141                    self.status_senders.insert(validator);
1142                    let status_power: u64 = self
1143                        .status_senders
1144                        .iter()
1145                        .map(|v| self.state.validator_set.power_of(*v))
1146                        .sum();
1147                    // Leader's own power counts toward quorum
1148                    let total_power =
1149                        status_power + self.state.validator_set.power_of(self.state.validator_id);
1150                    if total_power >= self.state.validator_set.quorum_threshold() {
1151                        self.try_propose().await;
1152                    }
1153                }
1154            }
1155
1156            ConsensusMessage::Evidence(proof) => {
1157                // C-6: Cryptographically verify evidence before accepting.
1158                // Both signatures must be valid from the alleged validator for
1159                // different block hashes at the same (view, vote_type).
1160                let vs = &self.state.validator_set;
1161                let vi = match vs.get(proof.validator) {
1162                    Some(vi) => vi,
1163                    None => {
1164                        warn!(validator = %proof.validator, "evidence for unknown validator");
1165                        return Ok(());
1166                    }
1167                };
1168                if proof.block_hash_a == proof.block_hash_b {
1169                    warn!(validator = %proof.validator, "evidence has identical block hashes");
1170                    return Ok(());
1171                }
1172                // Use the epoch from the proof itself (not local epoch) so
1173                // cross-epoch evidence can be verified correctly.
1174                let bytes_a = Vote::signing_bytes(
1175                    &self.state.chain_id_hash,
1176                    proof.epoch,
1177                    proof.view,
1178                    &proof.block_hash_a,
1179                    proof.vote_type,
1180                );
1181                let bytes_b = Vote::signing_bytes(
1182                    &self.state.chain_id_hash,
1183                    proof.epoch,
1184                    proof.view,
1185                    &proof.block_hash_b,
1186                    proof.vote_type,
1187                );
1188                if !self
1189                    .verifier
1190                    .verify(&vi.public_key, &bytes_a, &proof.signature_a)
1191                    || !self
1192                        .verifier
1193                        .verify(&vi.public_key, &bytes_b, &proof.signature_b)
1194                {
1195                    warn!(validator = %proof.validator, "evidence has invalid signatures");
1196                    return Ok(());
1197                }
1198
1199                info!(
1200                    validator = %proof.validator,
1201                    view = %proof.view,
1202                    "received valid evidence gossip"
1203                );
1204                if let Err(e) = self.app.on_evidence(&proof) {
1205                    warn!(error = %e, "on_evidence callback failed for gossiped proof");
1206                }
1207                if let Some(ref mut store) = self.evidence_store {
1208                    store.put_evidence(proof);
1209                }
1210            }
1211        }
1212        Ok(())
1213    }
1214
1215    fn handle_equivocation(&mut self, result: &crate::vote_collector::VoteResult) {
1216        if let Some(ref proof) = result.equivocation {
1217            warn!(
1218                validator = %proof.validator,
1219                view = %proof.view,
1220                "equivocation detected!"
1221            );
1222            if let Err(e) = self.app.on_evidence(proof) {
1223                warn!(error = %e, "on_evidence callback failed");
1224            }
1225            self.network.broadcast_evidence(proof);
1226            if let Some(ref mut store) = self.evidence_store {
1227                store.put_evidence(proof.clone());
1228            }
1229        }
1230    }
1231
1232    async fn on_qc_formed(&mut self, qc: QuorumCertificate) {
1233        // Save the QC so we can reliably pair it when forming a DoubleCert
1234        self.current_view_qc = Some(qc.clone());
1235
1236        view_protocol::on_votes_collected(
1237            &mut self.state,
1238            qc.clone(),
1239            self.network.as_ref(),
1240            self.signer.as_ref(),
1241        );
1242
1243        // Leader also does vote2 for its own prepare (self-vote for step 5)
1244        // Generate vote extension (ABCI++ Vote Extensions) if the block is available.
1245        let vote_extension = {
1246            let store = self.store.read().await;
1247            store.get_block(&qc.block_hash).and_then(|block| {
1248                let ctx = BlockContext {
1249                    height: block.height,
1250                    view: self.state.current_view,
1251                    proposer: block.proposer,
1252                    epoch: self.state.current_epoch.number,
1253                    epoch_start_view: self.state.current_epoch.start_view,
1254                    validator_set: &self.state.validator_set,
1255                };
1256                self.app.extend_vote(&block, &ctx)
1257            })
1258        };
1259        let vote_bytes = Vote::signing_bytes(
1260            &self.state.chain_id_hash,
1261            self.state.current_epoch.number,
1262            self.state.current_view,
1263            &qc.block_hash,
1264            VoteType::Vote2,
1265        );
1266        let signature = self.signer.sign(&vote_bytes);
1267        let vote = Vote {
1268            block_hash: qc.block_hash,
1269            view: self.state.current_view,
1270            validator: self.state.validator_id,
1271            signature,
1272            vote_type: VoteType::Vote2,
1273            extension: vote_extension,
1274        };
1275
1276        // Lock on this QC
1277        self.state.update_locked_qc(&qc);
1278
1279        let next_leader_id =
1280            leader::next_leader(&self.state.validator_set, self.state.current_view);
1281        if next_leader_id == self.state.validator_id {
1282            // We are the next leader, collect vote2 locally
1283            match self.vote_collector.add_vote(
1284                &self.state.validator_set,
1285                vote,
1286                self.state.current_epoch.number,
1287            ) {
1288                Ok(result) => {
1289                    self.handle_equivocation(&result);
1290                    if let Some(outer_qc) = result.qc {
1291                        self.on_double_cert_formed(outer_qc).await;
1292                    }
1293                }
1294                Err(e) => warn!(error = %e, "failed to add self vote2"),
1295            }
1296        } else {
1297            self.network
1298                .send_to(next_leader_id, ConsensusMessage::Vote2Msg(vote));
1299        }
1300    }
1301
1302    async fn on_double_cert_formed(&mut self, outer_qc: QuorumCertificate) {
1303        // Use the QC we explicitly saved from this view's first voting round
1304        let inner_qc = match self.current_view_qc.take() {
1305            Some(qc) if qc.block_hash == outer_qc.block_hash => qc,
1306            _ => {
1307                // Fallback to locked_qc or highest_qc
1308                match &self.state.locked_qc {
1309                    Some(qc) if qc.block_hash == outer_qc.block_hash => qc.clone(),
1310                    _ => match &self.state.highest_qc {
1311                        Some(qc) if qc.block_hash == outer_qc.block_hash => qc.clone(),
1312                        _ => {
1313                            warn!(
1314                                validator = %self.state.validator_id,
1315                                "double cert formed but can't find matching inner QC"
1316                            );
1317                            return;
1318                        }
1319                    },
1320                }
1321            }
1322        };
1323
1324        let dc = DoubleCertificate { inner_qc, outer_qc };
1325
1326        info!(
1327            validator = %self.state.validator_id,
1328            view = %self.state.current_view,
1329            hash = %dc.inner_qc.block_hash,
1330            "double certificate formed, committing"
1331        );
1332
1333        // Commit
1334        self.apply_commit(&dc, "double-cert").await;
1335
1336        self.state.highest_double_cert = Some(dc.clone());
1337
1338        // Advance to next view — as new leader, include DC in proposal
1339        self.advance_view(ViewEntryTrigger::DoubleCert(dc)).await;
1340    }
1341
1342    async fn handle_timeout(&mut self) {
1343        // Skip wish building/signing entirely when we have no voting power (fullnodes).
1344        // build_wish involves a cryptographic signing operation that serves no purpose
1345        // when the wish will never be broadcast or counted toward a TC.
1346        let has_power = self.state.validator_set.power_of(self.state.validator_id) > 0;
1347        if !has_power {
1348            self.pacemaker.on_timeout();
1349            return;
1350        }
1351
1352        info!(
1353            validator = %self.state.validator_id,
1354            view = %self.state.current_view,
1355            "view timeout, sending wish"
1356        );
1357
1358        let wish = self.pacemaker.build_wish(
1359            &self.state.chain_id_hash,
1360            self.state.current_epoch.number,
1361            self.state.current_view,
1362            self.state.validator_id,
1363            self.state.highest_qc.clone(),
1364            self.signer.as_ref(),
1365        );
1366
1367        self.network.broadcast(wish.clone());
1368
1369        // Also process our own wish
1370        if let ConsensusMessage::Wish {
1371            target_view,
1372            validator,
1373            highest_qc,
1374            signature,
1375        } = wish
1376            && let Some(tc) = self.pacemaker.add_wish(
1377                &self.state.validator_set,
1378                target_view,
1379                validator,
1380                highest_qc,
1381                signature,
1382            )
1383        {
1384            self.network
1385                .broadcast(ConsensusMessage::TimeoutCert(tc.clone()));
1386            self.advance_view(ViewEntryTrigger::TimeoutCert(tc)).await;
1387            return;
1388        }
1389
1390        // Exponential backoff on repeated timeouts
1391        self.pacemaker.on_timeout();
1392    }
1393
1394    /// Apply the result of a successful try_commit: update app_hash, pending epoch,
1395    /// store commit QCs, and flush. Called from both normal and fast-forward commit paths.
1396    async fn apply_commit(&mut self, dc: &DoubleCertificate, context: &str) {
1397        let store = self.store.read().await;
1398        match try_commit(
1399            dc,
1400            store.as_ref(),
1401            self.app.as_ref(),
1402            &mut self.state.last_committed_height,
1403            &self.state.current_epoch,
1404        ) {
1405            Ok(result) => {
1406                if !result.committed_blocks.is_empty() {
1407                    self.state.last_app_hash = result.last_app_hash;
1408                }
1409                if result.pending_epoch.is_some() {
1410                    self.pending_epoch = result.pending_epoch;
1411                }
1412                drop(store);
1413                {
1414                    let mut s = self.store.write().await;
1415                    for block in &result.committed_blocks {
1416                        // R-28: only write the commit QC for the block it actually certifies.
1417                        // Ancestor blocks committed via the chain rule may get their QC stored
1418                        // by the justify-QC persistence in handle_message (Propose path).
1419                        // Writing the wrong QC (mismatched block_hash) here would cause sync
1420                        // verification failures for those ancestor blocks.
1421                        if result.commit_qc.block_hash == block.hash {
1422                            s.put_commit_qc(block.height, result.commit_qc.clone());
1423                        }
1424                    }
1425                    s.flush();
1426                }
1427                // C-3: Mark evidence as committed for views included in committed blocks.
1428                if let Some(ref mut ev_store) = self.evidence_store {
1429                    for block in &result.committed_blocks {
1430                        for proof in ev_store.get_pending() {
1431                            if proof.view <= block.view {
1432                                ev_store.mark_committed(proof.view, proof.validator);
1433                            }
1434                        }
1435                    }
1436                }
1437                // C-7: Persist consensus state immediately after committing blocks so
1438                // that a crash between apply_commit and the next advance_view does not
1439                // lose the updated last_committed_height / app_hash / epoch.
1440                self.persist_state();
1441            }
1442            Err(e) => {
1443                warn!(error = %e, "try_commit failed during {context}");
1444                drop(store);
1445            }
1446        }
1447    }
1448
1449    /// Cryptographically validate a DoubleCertificate:
1450    /// 1. inner and outer QC must reference the same block hash
1451    /// 2. inner QC aggregate signature (Vote1) must be valid and reach quorum
1452    /// 3. outer QC aggregate signature (Vote2) must be valid and reach quorum
1453    ///
1454    /// Note on quorum and epoch transitions: DCs are always formed in the same epoch as
1455    /// the block they commit (vote_collector enforces quorum at formation time).  When
1456    /// a DC is received by a node that has already transitioned to a new epoch, the
1457    /// validator set may differ.  We enforce quorum against the current validator set
1458    /// as the best available reference; a legitimate DC from a prior epoch should still
1459    /// satisfy quorum against the new set unless the set shrank significantly.
1460    fn validate_double_cert(&self, dc: &DoubleCertificate) -> bool {
1461        if dc.inner_qc.block_hash != dc.outer_qc.block_hash {
1462            warn!("double cert inner/outer block_hash mismatch");
1463            return false;
1464        }
1465        let vs = &self.state.validator_set;
1466        let inner_bytes = Vote::signing_bytes(
1467            &self.state.chain_id_hash,
1468            dc.inner_qc.epoch,
1469            dc.inner_qc.view,
1470            &dc.inner_qc.block_hash,
1471            VoteType::Vote,
1472        );
1473        if !self
1474            .verifier
1475            .verify_aggregate(vs, &inner_bytes, &dc.inner_qc.aggregate_signature)
1476        {
1477            warn!("double cert inner QC signature invalid");
1478            return false;
1479        }
1480        if !hotmint_crypto::has_quorum(vs, &dc.inner_qc.aggregate_signature) {
1481            warn!("double cert inner QC below quorum threshold");
1482            return false;
1483        }
1484        let outer_bytes = Vote::signing_bytes(
1485            &self.state.chain_id_hash,
1486            dc.outer_qc.epoch,
1487            dc.outer_qc.view,
1488            &dc.outer_qc.block_hash,
1489            VoteType::Vote2,
1490        );
1491        if !self
1492            .verifier
1493            .verify_aggregate(vs, &outer_bytes, &dc.outer_qc.aggregate_signature)
1494        {
1495            warn!("double cert outer QC signature invalid");
1496            return false;
1497        }
1498        if !hotmint_crypto::has_quorum(vs, &dc.outer_qc.aggregate_signature) {
1499            warn!("double cert outer QC below quorum threshold");
1500            return false;
1501        }
1502        true
1503    }
1504
1505    fn persist_state(&mut self) {
1506        if let Some(p) = self.persistence.as_mut() {
1507            p.save_current_view(self.state.current_view);
1508            if let Some(ref qc) = self.state.locked_qc {
1509                p.save_locked_qc(qc);
1510            }
1511            if let Some(ref qc) = self.state.highest_qc {
1512                p.save_highest_qc(qc);
1513            }
1514            p.save_last_committed_height(self.state.last_committed_height);
1515            p.save_current_epoch(&self.state.current_epoch);
1516            p.save_last_app_hash(self.state.last_app_hash);
1517            p.flush();
1518        }
1519    }
1520
1521    async fn advance_view(&mut self, trigger: ViewEntryTrigger) {
1522        let new_view = match &trigger {
1523            ViewEntryTrigger::DoubleCert(_) => self.state.current_view.next(),
1524            ViewEntryTrigger::TimeoutCert(tc) => ViewNumber(tc.view.as_u64() + 1),
1525            ViewEntryTrigger::Genesis => ViewNumber(1),
1526        };
1527        self.advance_view_to(new_view, trigger).await;
1528    }
1529
1530    async fn advance_view_to(&mut self, new_view: ViewNumber, trigger: ViewEntryTrigger) {
1531        if new_view <= self.state.current_view {
1532            return;
1533        }
1534
1535        // Reset backoff on successful progress (DoubleCert path)
1536        let is_progress = matches!(&trigger, ViewEntryTrigger::DoubleCert(_));
1537
1538        self.vote_collector.clear_view(self.state.current_view);
1539        self.vote_collector.prune_before(self.state.current_view);
1540        self.pacemaker.clear_view(self.state.current_view);
1541        self.pacemaker.prune_before(self.state.current_view);
1542        self.status_senders.clear();
1543        self.current_view_qc = None;
1544
1545        // Epoch transition: apply pending validator set change when we reach the
1546        // epoch's start_view. The start_view is set deterministically (commit_view + 2)
1547        // so all honest nodes apply the transition at the same view.
1548        if self
1549            .pending_epoch
1550            .as_ref()
1551            .is_some_and(|e| new_view >= e.start_view)
1552        {
1553            // SAFETY: we just verified `pending_epoch` is `Some` above.
1554            let Some(new_epoch) = self.pending_epoch.take() else {
1555                unreachable!("pending_epoch was Some in the condition check");
1556            };
1557            info!(
1558                validator = %self.state.validator_id,
1559                old_epoch = %self.state.current_epoch.number,
1560                new_epoch = %new_epoch.number,
1561                start_view = %new_epoch.start_view,
1562                validators = new_epoch.validator_set.validator_count(),
1563                "epoch transition"
1564            );
1565            self.state.validator_set = new_epoch.validator_set.clone();
1566            self.state.current_epoch = new_epoch;
1567            // Notify network layer of the new validator set and epoch
1568            self.network
1569                .on_epoch_change(self.state.current_epoch.number, &self.state.validator_set);
1570            // Full clear: old votes/wishes are from the previous epoch's validator set
1571            self.vote_collector = VoteCollector::new();
1572            self.pacemaker = Pacemaker::with_config(self.pacemaker_config.clone());
1573        }
1574
1575        view_protocol::enter_view(
1576            &mut self.state,
1577            new_view,
1578            trigger,
1579            self.network.as_ref(),
1580            self.signer.as_ref(),
1581        );
1582
1583        if is_progress {
1584            self.pacemaker.reset_on_progress();
1585        } else {
1586            self.pacemaker.reset_timer();
1587        }
1588
1589        self.persist_state();
1590
1591        // If we're the leader, propose immediately.
1592        // Note: in a full implementation, the leader would collect StatusCerts
1593        // before proposing (status_senders quorum gate). Currently the immediate
1594        // propose path is required for liveness across epoch transitions where
1595        // cross-epoch verification complexity can stall status collection.
1596        if self.state.is_leader() && self.state.step == ViewStep::WaitingForStatus {
1597            self.try_propose().await;
1598        }
1599    }
1600}
1601
1602// ---------------------------------------------------------------------------
1603// Regression tests for sub-quorum certificate injection (R-29, R-32)
1604// ---------------------------------------------------------------------------
1605
1606#[cfg(test)]
1607mod tests {
1608    use super::*;
1609
1610    use std::sync::Arc;
1611
1612    use tokio::sync::RwLock;
1613
1614    use hotmint_crypto::{Ed25519Signer, Ed25519Verifier};
1615    use hotmint_types::Signer as SignerTrait;
1616    use hotmint_types::certificate::QuorumCertificate;
1617    use hotmint_types::crypto::AggregateSignature;
1618    use hotmint_types::epoch::EpochNumber;
1619    use hotmint_types::validator::{ValidatorId, ValidatorInfo};
1620    use hotmint_types::vote::{Vote, VoteType};
1621    use tokio::sync::mpsc;
1622
1623    use crate::application::NoopApplication;
1624    use crate::network::NetworkSink;
1625    use crate::state::ConsensusState;
1626    use crate::store::MemoryBlockStore;
1627
1628    // Minimal no-op network for unit tests — messages are silently discarded.
1629    struct DevNullNetwork;
1630    impl NetworkSink for DevNullNetwork {
1631        fn broadcast(&self, _: ConsensusMessage) {}
1632        fn send_to(&self, _: ValidatorId, _: ConsensusMessage) {}
1633    }
1634
1635    /// Default chain_id_hash for tests — matches ConsensusState::new() which uses chain_id = "".
1636    fn test_chain_id_hash() -> [u8; 32] {
1637        *blake3::hash(b"").as_bytes()
1638    }
1639
1640    fn make_validator_set_4() -> (ValidatorSet, Vec<Ed25519Signer>) {
1641        let signers: Vec<Ed25519Signer> = (0..4)
1642            .map(|i| Ed25519Signer::generate(ValidatorId(i)))
1643            .collect();
1644        let infos: Vec<ValidatorInfo> = signers
1645            .iter()
1646            .map(|s| ValidatorInfo {
1647                id: s.validator_id(),
1648                public_key: s.public_key(),
1649                power: 1,
1650            })
1651            .collect();
1652        (ValidatorSet::new(infos), signers)
1653    }
1654
1655    fn make_test_engine(
1656        vid: ValidatorId,
1657        vs: ValidatorSet,
1658        signer: Ed25519Signer,
1659    ) -> (
1660        ConsensusEngine,
1661        mpsc::Sender<(Option<ValidatorId>, ConsensusMessage)>,
1662    ) {
1663        let (tx, rx) = mpsc::channel(64);
1664        let store = Arc::new(RwLock::new(
1665            Box::new(MemoryBlockStore::new()) as Box<dyn crate::store::BlockStore>
1666        ));
1667        let state = ConsensusState::new(vid, vs);
1668        let engine = ConsensusEngine::new(
1669            state,
1670            store,
1671            Box::new(DevNullNetwork),
1672            Box::new(NoopApplication),
1673            Box::new(signer),
1674            rx,
1675            EngineConfig {
1676                verifier: Box::new(Ed25519Verifier),
1677                pacemaker: None,
1678                persistence: None,
1679                evidence_store: None,
1680            },
1681        );
1682        (engine, tx)
1683    }
1684
1685    // R-29 regression: a Propose message whose justify QC is signed by fewer than
1686    // 2f+1 validators must be rejected by verify_message().
1687    #[test]
1688    fn r29_propose_sub_quorum_justify_rejected_by_verify_message() {
1689        let (vs, signers) = make_validator_set_4();
1690        // Use a fresh signer for the engine; verify_message only needs the engine's
1691        // validator set and verifier, not its own signing key.
1692        let engine_signer = Ed25519Signer::generate(ValidatorId(0));
1693        let (engine, _tx) = make_test_engine(ValidatorId(0), vs.clone(), engine_signer);
1694
1695        // Build a justify QC signed by exactly 1 of 4 validators — below 2f+1 = 3.
1696        let chain_id_hash = test_chain_id_hash();
1697        let hash = BlockHash::GENESIS;
1698        let qc_view = ViewNumber::GENESIS;
1699        let vote_bytes = Vote::signing_bytes(
1700            &chain_id_hash,
1701            EpochNumber(0),
1702            qc_view,
1703            &hash,
1704            VoteType::Vote,
1705        );
1706        let mut agg = AggregateSignature::new(4);
1707        agg.add(1, SignerTrait::sign(&signers[1], &vote_bytes))
1708            .unwrap();
1709        let sub_quorum_qc = QuorumCertificate {
1710            block_hash: hash,
1711            view: qc_view,
1712            aggregate_signature: agg,
1713            epoch: EpochNumber(0),
1714        };
1715
1716        // Construct a proposal from V1 carrying this sub-quorum justify.
1717        let mut block = Block::genesis();
1718        block.height = Height(1);
1719        block.view = ViewNumber(1);
1720        block.proposer = ValidatorId(1);
1721        block.hash = block.compute_hash();
1722        let proposal_bytes = crate::view_protocol::proposal_signing_bytes(
1723            &chain_id_hash,
1724            EpochNumber(0),
1725            &block,
1726            &sub_quorum_qc,
1727        );
1728        let signature = SignerTrait::sign(&signers[1], &proposal_bytes);
1729
1730        let msg = ConsensusMessage::Propose {
1731            block: Box::new(block),
1732            justify: Box::new(sub_quorum_qc),
1733            double_cert: None,
1734            signature,
1735        };
1736
1737        assert!(
1738            !engine.verify_message(&msg),
1739            "R-29 regression: Propose with sub-quorum justify QC must be rejected by verify_message"
1740        );
1741    }
1742
1743    // R-29 regression: a Propose message with a full quorum justify QC (3/4) must pass.
1744    #[test]
1745    fn r29_propose_full_quorum_justify_accepted_by_verify_message() {
1746        let (vs, signers) = make_validator_set_4();
1747        let engine_signer = Ed25519Signer::generate(ValidatorId(0));
1748        let (engine, _tx) = make_test_engine(ValidatorId(0), vs.clone(), engine_signer);
1749
1750        let chain_id_hash = test_chain_id_hash();
1751        let hash = BlockHash::GENESIS;
1752        let qc_view = ViewNumber::GENESIS;
1753        let vote_bytes = Vote::signing_bytes(
1754            &chain_id_hash,
1755            EpochNumber(0),
1756            qc_view,
1757            &hash,
1758            VoteType::Vote,
1759        );
1760        // 3 of 4 signers — meets 2f+1 threshold.
1761        let mut agg = AggregateSignature::new(4);
1762        for (i, signer) in signers.iter().take(3).enumerate() {
1763            agg.add(i, SignerTrait::sign(signer, &vote_bytes)).unwrap();
1764        }
1765        let full_quorum_qc = QuorumCertificate {
1766            block_hash: hash,
1767            view: qc_view,
1768            aggregate_signature: agg,
1769            epoch: EpochNumber(0),
1770        };
1771
1772        let mut block = Block::genesis();
1773        block.height = Height(1);
1774        block.view = ViewNumber(1);
1775        block.proposer = ValidatorId(1);
1776        block.hash = block.compute_hash();
1777        let proposal_bytes = crate::view_protocol::proposal_signing_bytes(
1778            &chain_id_hash,
1779            EpochNumber(0),
1780            &block,
1781            &full_quorum_qc,
1782        );
1783        let signature = SignerTrait::sign(&signers[1], &proposal_bytes);
1784
1785        let msg = ConsensusMessage::Propose {
1786            block: Box::new(block),
1787            justify: Box::new(full_quorum_qc),
1788            double_cert: None,
1789            signature,
1790        };
1791
1792        assert!(
1793            engine.verify_message(&msg),
1794            "R-29: Propose with full quorum justify QC must pass verify_message"
1795        );
1796    }
1797
1798    // R-32 regression: a Wish carrying a sub-quorum highest_qc must cause
1799    // verify_highest_qc_in_wish to treat the QC as invalid and return false,
1800    // which causes handle_message to discard the Wish without forwarding it
1801    // to the pacemaker.
1802    //
1803    // We verify the sub-component: has_quorum returns false for a 1-of-4 aggregate,
1804    // ensuring the guard in handle_message fires.
1805    #[test]
1806    fn r32_sub_quorum_highest_qc_fails_has_quorum() {
1807        let (vs, signers) = make_validator_set_4();
1808
1809        let chain_id_hash = test_chain_id_hash();
1810        let hash = BlockHash([1u8; 32]);
1811        let qc_view = ViewNumber(1);
1812        let vote_bytes = Vote::signing_bytes(
1813            &chain_id_hash,
1814            EpochNumber(0),
1815            qc_view,
1816            &hash,
1817            VoteType::Vote,
1818        );
1819
1820        // Build a QC with only 1 signer — sub-quorum.
1821        let mut agg = AggregateSignature::new(4);
1822        agg.add(0, SignerTrait::sign(&signers[0], &vote_bytes))
1823            .unwrap();
1824
1825        assert!(
1826            !hotmint_crypto::has_quorum(&vs, &agg),
1827            "R-32 regression: 1-of-4 signed QC must not satisfy has_quorum"
1828        );
1829
1830        // Build a QC with 3 signers — full quorum.
1831        let mut agg_full = AggregateSignature::new(4);
1832        for (i, signer) in signers.iter().take(3).enumerate() {
1833            agg_full
1834                .add(i, SignerTrait::sign(signer, &vote_bytes))
1835                .unwrap();
1836        }
1837        assert!(
1838            hotmint_crypto::has_quorum(&vs, &agg_full),
1839            "R-32: 3-of-4 signed QC must satisfy has_quorum"
1840        );
1841    }
1842}