Skip to main content

hotmint_consensus/
engine.rs

1use ruc::*;
2
3use std::collections::{HashMap, HashSet};
4use std::sync::Arc;
5
6use tokio::sync::RwLock;
7
8use crate::application::Application;
9use crate::commit::try_commit;
10use crate::leader;
11use crate::network::NetworkSink;
12use crate::pacemaker::{Pacemaker, PacemakerConfig};
13use crate::state::{ConsensusState, ViewStep};
14use crate::store::BlockStore;
15use crate::view_protocol::{self, ViewEntryTrigger};
16use crate::vote_collector::VoteCollector;
17
18use hotmint_types::epoch::Epoch;
19use hotmint_types::vote::VoteType;
20use hotmint_types::*;
21use tokio::sync::mpsc;
22use tracing::{info, warn};
23
24/// Shared block store type used by the engine, RPC, and sync responder.
25pub type SharedBlockStore = Arc<RwLock<Box<dyn BlockStore>>>;
26
27/// Trait for persisting critical consensus state across restarts.
28pub trait StatePersistence: Send {
29    fn save_current_view(&mut self, view: ViewNumber);
30    fn save_locked_qc(&mut self, qc: &QuorumCertificate);
31    fn save_highest_qc(&mut self, qc: &QuorumCertificate);
32    fn save_last_committed_height(&mut self, height: Height);
33    fn save_current_epoch(&mut self, epoch: &Epoch);
34    fn save_last_app_hash(&mut self, hash: BlockHash);
35    fn flush(&self);
36}
37
38pub struct ConsensusEngine {
39    state: ConsensusState,
40    store: SharedBlockStore,
41    network: Box<dyn NetworkSink>,
42    app: Box<dyn Application>,
43    signer: Box<dyn Signer>,
44    verifier: Box<dyn Verifier>,
45    vote_collector: VoteCollector,
46    pacemaker: Pacemaker,
47    pacemaker_config: PacemakerConfig,
48    msg_rx: mpsc::Receiver<(Option<ValidatorId>, ConsensusMessage)>,
49    /// Collected unique status cert senders (for leader, per view)
50    status_senders: HashSet<ValidatorId>,
51    /// The QC formed in this view's first voting round (used to build DoubleCert)
52    current_view_qc: Option<QuorumCertificate>,
53    /// Pending epoch transition (set by try_commit, applied in advance_view_to)
54    pending_epoch: Option<Epoch>,
55    /// Optional state persistence (for crash recovery).
56    persistence: Option<Box<dyn StatePersistence>>,
57}
58
59/// Configuration for ConsensusEngine.
60pub struct EngineConfig {
61    pub verifier: Box<dyn Verifier>,
62    pub pacemaker: Option<PacemakerConfig>,
63    pub persistence: Option<Box<dyn StatePersistence>>,
64}
65
66impl EngineConfig {
67    /// Create an `EngineConfig` with the given verifier and defaults
68    /// (no custom pacemaker, no persistence).
69    pub fn new(verifier: Box<dyn Verifier>) -> Self {
70        Self {
71            verifier,
72            pacemaker: None,
73            persistence: None,
74        }
75    }
76
77    /// Set a custom pacemaker configuration.
78    pub fn with_pacemaker(mut self, pacemaker: PacemakerConfig) -> Self {
79        self.pacemaker = Some(pacemaker);
80        self
81    }
82
83    /// Set a state persistence backend.
84    pub fn with_persistence(mut self, persistence: Box<dyn StatePersistence>) -> Self {
85        self.persistence = Some(persistence);
86        self
87    }
88}
89
90/// Builder for constructing a `ConsensusEngine` with a fluent API.
91///
92/// # Example
93/// ```rust,ignore
94/// let engine = ConsensusEngineBuilder::new()
95///     .state(state)
96///     .store(store)
97///     .network(network)
98///     .app(app)
99///     .signer(signer)
100///     .messages(msg_rx)
101///     .verifier(verifier)
102///     .build()
103///     .expect("all required fields must be set");
104/// ```
105pub struct ConsensusEngineBuilder {
106    state: Option<ConsensusState>,
107    store: Option<SharedBlockStore>,
108    network: Option<Box<dyn NetworkSink>>,
109    app: Option<Box<dyn Application>>,
110    signer: Option<Box<dyn Signer>>,
111    msg_rx: Option<mpsc::Receiver<(Option<ValidatorId>, ConsensusMessage)>>,
112    verifier: Option<Box<dyn Verifier>>,
113    pacemaker: Option<PacemakerConfig>,
114    persistence: Option<Box<dyn StatePersistence>>,
115}
116
117impl ConsensusEngineBuilder {
118    pub fn new() -> Self {
119        Self {
120            state: None,
121            store: None,
122            network: None,
123            app: None,
124            signer: None,
125            msg_rx: None,
126            verifier: None,
127            pacemaker: None,
128            persistence: None,
129        }
130    }
131
132    pub fn state(mut self, state: ConsensusState) -> Self {
133        self.state = Some(state);
134        self
135    }
136
137    pub fn store(mut self, store: SharedBlockStore) -> Self {
138        self.store = Some(store);
139        self
140    }
141
142    pub fn network(mut self, network: Box<dyn NetworkSink>) -> Self {
143        self.network = Some(network);
144        self
145    }
146
147    pub fn app(mut self, app: Box<dyn Application>) -> Self {
148        self.app = Some(app);
149        self
150    }
151
152    pub fn signer(mut self, signer: Box<dyn Signer>) -> Self {
153        self.signer = Some(signer);
154        self
155    }
156
157    pub fn messages(
158        mut self,
159        msg_rx: mpsc::Receiver<(Option<ValidatorId>, ConsensusMessage)>,
160    ) -> Self {
161        self.msg_rx = Some(msg_rx);
162        self
163    }
164
165    pub fn verifier(mut self, verifier: Box<dyn Verifier>) -> Self {
166        self.verifier = Some(verifier);
167        self
168    }
169
170    pub fn pacemaker(mut self, config: PacemakerConfig) -> Self {
171        self.pacemaker = Some(config);
172        self
173    }
174
175    pub fn persistence(mut self, persistence: Box<dyn StatePersistence>) -> Self {
176        self.persistence = Some(persistence);
177        self
178    }
179
180    pub fn build(self) -> ruc::Result<ConsensusEngine> {
181        let state = self.state.ok_or_else(|| ruc::eg!("state is required"))?;
182        let store = self.store.ok_or_else(|| ruc::eg!("store is required"))?;
183        let network = self
184            .network
185            .ok_or_else(|| ruc::eg!("network is required"))?;
186        let app = self.app.ok_or_else(|| ruc::eg!("app is required"))?;
187        let signer = self.signer.ok_or_else(|| ruc::eg!("signer is required"))?;
188        let msg_rx = self
189            .msg_rx
190            .ok_or_else(|| ruc::eg!("messages (msg_rx) is required"))?;
191        let verifier = self
192            .verifier
193            .ok_or_else(|| ruc::eg!("verifier is required"))?;
194
195        let config = EngineConfig {
196            verifier,
197            pacemaker: self.pacemaker,
198            persistence: self.persistence,
199        };
200
201        Ok(ConsensusEngine::new(
202            state, store, network, app, signer, msg_rx, config,
203        ))
204    }
205}
206
207impl Default for ConsensusEngineBuilder {
208    fn default() -> Self {
209        Self::new()
210    }
211}
212
213impl ConsensusEngine {
214    pub fn new(
215        state: ConsensusState,
216        store: SharedBlockStore,
217        network: Box<dyn NetworkSink>,
218        app: Box<dyn Application>,
219        signer: Box<dyn Signer>,
220        msg_rx: mpsc::Receiver<(Option<ValidatorId>, ConsensusMessage)>,
221        config: EngineConfig,
222    ) -> Self {
223        let pc = config.pacemaker.unwrap_or_default();
224        Self {
225            state,
226            store,
227            network,
228            app,
229            signer,
230            verifier: config.verifier,
231            vote_collector: VoteCollector::new(),
232            pacemaker: Pacemaker::with_config(pc.clone()),
233            pacemaker_config: pc,
234            msg_rx,
235            status_senders: HashSet::new(),
236            current_view_qc: None,
237            pending_epoch: None,
238            persistence: config.persistence,
239        }
240    }
241
242    /// Bootstrap and start the event loop.
243    /// If persisted state was restored (current_view > 1), skip genesis bootstrap.
244    pub async fn run(mut self) {
245        if self.state.current_view.as_u64() <= 1 {
246            self.enter_genesis_view().await;
247        } else {
248            info!(
249                validator = %self.state.validator_id,
250                view = %self.state.current_view,
251                height = %self.state.last_committed_height,
252                "resuming from persisted state"
253            );
254            self.pacemaker.reset_timer();
255        }
256
257        loop {
258            let deadline = self.pacemaker.sleep_until_deadline();
259            tokio::pin!(deadline);
260
261            tokio::select! {
262                Some((sender, msg)) = self.msg_rx.recv() => {
263                    if let Err(e) = self.handle_message(sender, msg).await {
264                        warn!(validator = %self.state.validator_id, error = %e, "error handling message");
265                    }
266                }
267                _ = &mut deadline => {
268                    self.handle_timeout().await;
269                }
270            }
271        }
272    }
273
274    async fn enter_genesis_view(&mut self) {
275        // Create a synthetic genesis QC so the first leader can propose
276        let genesis_qc = QuorumCertificate {
277            block_hash: BlockHash::GENESIS,
278            view: ViewNumber::GENESIS,
279            aggregate_signature: AggregateSignature::new(
280                self.state.validator_set.validator_count(),
281            ),
282        };
283        self.state.highest_qc = Some(genesis_qc);
284
285        let view = ViewNumber(1);
286        view_protocol::enter_view(
287            &mut self.state,
288            view,
289            ViewEntryTrigger::Genesis,
290            self.network.as_ref(),
291            self.signer.as_ref(),
292        );
293        self.pacemaker.reset_timer();
294
295        // If leader of genesis view, propose immediately
296        if self.state.is_leader() {
297            self.state.step = ViewStep::WaitingForStatus;
298            // In genesis, skip status wait — propose directly
299            self.try_propose().await;
300        }
301    }
302
303    fn try_propose(
304        &mut self,
305    ) -> std::pin::Pin<Box<dyn std::future::Future<Output = ()> + Send + '_>> {
306        Box::pin(async move {
307            let mut store = self.store.write().await;
308            match view_protocol::propose(
309                &mut self.state,
310                store.as_mut(),
311                self.network.as_ref(),
312                self.app.as_ref(),
313                self.signer.as_ref(),
314            ) {
315                Ok(block) => {
316                    drop(store);
317                    // Leader votes for its own block
318                    self.leader_self_vote(block.hash).await;
319                }
320                Err(e) => {
321                    warn!(
322                        validator = %self.state.validator_id,
323                        error = %e,
324                        "failed to propose"
325                    );
326                }
327            }
328        })
329    }
330
331    async fn leader_self_vote(&mut self, block_hash: BlockHash) {
332        let vote_bytes = Vote::signing_bytes(
333            &self.state.chain_id_hash,
334            self.state.current_view,
335            &block_hash,
336            VoteType::Vote,
337        );
338        let signature = self.signer.sign(&vote_bytes);
339        let vote = Vote {
340            block_hash,
341            view: self.state.current_view,
342            validator: self.state.validator_id,
343            signature,
344            vote_type: VoteType::Vote,
345        };
346        match self
347            .vote_collector
348            .add_vote(&self.state.validator_set, vote)
349        {
350            Ok(result) => {
351                self.handle_equivocation(&result);
352                if let Some(qc) = result.qc {
353                    self.on_qc_formed(qc).await;
354                }
355            }
356            Err(e) => warn!(error = %e, "failed to add self vote"),
357        }
358    }
359}
360
361/// Verify the per-sender individual signature on a consensus message before relaying.
362///
363/// Only checks signatures that can be attributed to a single known validator using the
364/// provided key map.  Aggregate certificates (TimeoutCert, justify/prepare QCs) are
365/// intentionally not fully re-verified here — the receiving engine always does the full
366/// check.  Messages whose signing bytes depend on receiver state (StatusCert needs
367/// `current_view`) are also allowed through; the engine will reject them if invalid.
368///
369/// `ordered_validators` is the validator list in round-robin order (same order used by
370/// `ValidatorSet::leader_for_view`).  Pass an empty slice to skip the leader-for-view
371/// check (e.g., in tests where the set is not available).
372///
373/// Returns `false` when:
374/// - The claimed sender is not in `validator_keys` (unknown/non-validator peer), OR
375/// - The individual signature is cryptographically invalid, OR
376/// - For Prepare: the sender is not the leader for the certificate's view.
377pub fn verify_relay_sender(
378    sender: ValidatorId,
379    msg: &ConsensusMessage,
380    validator_keys: &HashMap<ValidatorId, hotmint_types::crypto::PublicKey>,
381    ordered_validators: &[ValidatorId],
382    chain_id_hash: &[u8; 32],
383) -> bool {
384    use hotmint_crypto::Ed25519Verifier;
385    use hotmint_types::Verifier;
386    use hotmint_types::vote::Vote;
387    let verifier = Ed25519Verifier;
388    match msg {
389        ConsensusMessage::Propose {
390            block,
391            justify,
392            signature,
393            ..
394        } => {
395            let Some(pk) = validator_keys.get(&block.proposer) else {
396                return false;
397            };
398            let bytes = crate::view_protocol::proposal_signing_bytes(chain_id_hash, block, justify);
399            Verifier::verify(&verifier, pk, &bytes, signature)
400        }
401        ConsensusMessage::VoteMsg(vote) | ConsensusMessage::Vote2Msg(vote) => {
402            let Some(pk) = validator_keys.get(&vote.validator) else {
403                return false;
404            };
405            let bytes =
406                Vote::signing_bytes(chain_id_hash, vote.view, &vote.block_hash, vote.vote_type);
407            Verifier::verify(&verifier, pk, &bytes, &vote.signature)
408        }
409        ConsensusMessage::Prepare {
410            certificate,
411            signature,
412        } => {
413            // Prepare is broadcast by the current leader. Verify that the relay
414            // sender is the leader for this view, then check the signature.
415            if !ordered_validators.is_empty() {
416                let n = ordered_validators.len();
417                let expected_leader = ordered_validators[certificate.view.as_u64() as usize % n];
418                if sender != expected_leader {
419                    return false;
420                }
421            }
422            let Some(pk) = validator_keys.get(&sender) else {
423                return false;
424            };
425            let bytes = crate::view_protocol::prepare_signing_bytes(chain_id_hash, certificate);
426            Verifier::verify(&verifier, pk, &bytes, signature)
427        }
428        ConsensusMessage::Wish {
429            target_view,
430            validator,
431            highest_qc,
432            signature,
433        } => {
434            let Some(pk) = validator_keys.get(validator) else {
435                return false;
436            };
437            let bytes = crate::pacemaker::wish_signing_bytes(
438                chain_id_hash,
439                *target_view,
440                highest_qc.as_ref(),
441            );
442            Verifier::verify(&verifier, pk, &bytes, signature)
443        }
444        ConsensusMessage::TimeoutCert(tc) => {
445            // Full relay verification: verify each signer's signature + quorum check.
446            let target_view = ViewNumber(tc.view.as_u64() + 1);
447            let n = ordered_validators.len();
448            if n == 0 || tc.aggregate_signature.signers.len() != n {
449                return false;
450            }
451            let mut sig_idx = 0usize;
452            let mut verified_count = 0usize;
453            for (i, &signed) in tc.aggregate_signature.signers.iter().enumerate() {
454                if !signed {
455                    continue;
456                }
457                if i >= n {
458                    return false;
459                }
460                let vid = ordered_validators[i];
461                let Some(pk) = validator_keys.get(&vid) else {
462                    return false;
463                };
464                let hqc = tc.highest_qcs.get(i).and_then(|h| h.as_ref());
465                let bytes = crate::pacemaker::wish_signing_bytes(chain_id_hash, target_view, hqc);
466                if sig_idx >= tc.aggregate_signature.signatures.len() {
467                    return false;
468                }
469                if !Verifier::verify(
470                    &verifier,
471                    pk,
472                    &bytes,
473                    &tc.aggregate_signature.signatures[sig_idx],
474                ) {
475                    return false;
476                }
477                sig_idx += 1;
478                verified_count += 1;
479            }
480            if sig_idx != tc.aggregate_signature.signatures.len() {
481                return false;
482            }
483            // Quorum check: require > 2/3 of validators (by count, not power —
484            // full power-based check is done in engine::verify_message).
485            verified_count * 3 > n * 2
486        }
487        ConsensusMessage::StatusCert {
488            validator,
489            signature,
490            locked_qc,
491            ..
492        } => {
493            // StatusCert signing bytes require current_view which we don't have
494            // in the relay context. Verify the sender is a known validator and
495            // the signature is over *some* plausible view (the TC view is not
496            // available here). The engine does full verification with correct view.
497            // At minimum, reject unknown validators.
498            let Some(pk) = validator_keys.get(validator) else {
499                return false;
500            };
501            // We cannot construct exact signing bytes without knowing current_view,
502            // so we accept from known validators only. The engine's verify_message
503            // will do full cryptographic verification.
504            let _ = (pk, signature, locked_qc);
505            true
506        }
507    }
508}
509
510impl ConsensusEngine {
511    /// Verify the cryptographic signature on an inbound consensus message.
512    /// Returns false (and logs a warning) if verification fails.
513    /// Messages from past views are skipped (they'll be dropped by handle_message anyway).
514    fn verify_message(&self, msg: &ConsensusMessage) -> bool {
515        // Skip verification for non-Propose past-view messages — these may have
516        // been signed by a previous epoch's validator set. They'll be dropped by
517        // view checks. Propose messages are always verified because they may still
518        // be stored (for chain continuity in fast-forward).
519        let msg_view = match msg {
520            ConsensusMessage::Propose { .. } => None, // always verify proposals
521            ConsensusMessage::VoteMsg(v) | ConsensusMessage::Vote2Msg(v) => Some(v.view),
522            ConsensusMessage::Prepare { certificate, .. } => Some(certificate.view),
523            ConsensusMessage::Wish { target_view, .. } => Some(*target_view),
524            ConsensusMessage::TimeoutCert(tc) => Some(ViewNumber(tc.view.as_u64() + 1)),
525            ConsensusMessage::StatusCert { .. } => None,
526        };
527        if let Some(v) = msg_view
528            && v < self.state.current_view
529        {
530            return true; // will be dropped by handler
531        }
532
533        let vs = &self.state.validator_set;
534        match msg {
535            ConsensusMessage::Propose {
536                block,
537                justify,
538                signature,
539                ..
540            } => {
541                let proposer = vs.get(block.proposer);
542                let Some(vi) = proposer else {
543                    warn!(proposer = %block.proposer, "propose from unknown validator");
544                    return false;
545                };
546                let bytes = view_protocol::proposal_signing_bytes(
547                    &self.state.chain_id_hash,
548                    block,
549                    justify,
550                );
551                if !self.verifier.verify(&vi.public_key, &bytes, signature) {
552                    warn!(proposer = %block.proposer, "invalid proposal signature");
553                    return false;
554                }
555                // Verify justify QC aggregate signature (skip genesis QC which has no signers)
556                if justify.aggregate_signature.count() > 0 {
557                    let qc_bytes = Vote::signing_bytes(
558                        &self.state.chain_id_hash,
559                        justify.view,
560                        &justify.block_hash,
561                        VoteType::Vote,
562                    );
563                    if !self
564                        .verifier
565                        .verify_aggregate(vs, &qc_bytes, &justify.aggregate_signature)
566                    {
567                        warn!(proposer = %block.proposer, "invalid justify QC aggregate signature");
568                        return false;
569                    }
570                    if !hotmint_crypto::has_quorum(vs, &justify.aggregate_signature) {
571                        warn!(proposer = %block.proposer, "justify QC below quorum threshold");
572                        return false;
573                    }
574                }
575                true
576            }
577            ConsensusMessage::VoteMsg(vote) | ConsensusMessage::Vote2Msg(vote) => {
578                let Some(vi) = vs.get(vote.validator) else {
579                    warn!(validator = %vote.validator, "vote from unknown validator");
580                    return false;
581                };
582                let bytes = Vote::signing_bytes(
583                    &self.state.chain_id_hash,
584                    vote.view,
585                    &vote.block_hash,
586                    vote.vote_type,
587                );
588                if !self
589                    .verifier
590                    .verify(&vi.public_key, &bytes, &vote.signature)
591                {
592                    warn!(validator = %vote.validator, "invalid vote signature");
593                    return false;
594                }
595                true
596            }
597            ConsensusMessage::Prepare {
598                certificate,
599                signature,
600            } => {
601                // Verify the leader's signature on the prepare message
602                let Some(leader) = vs.leader_for_view(certificate.view) else {
603                    return false;
604                };
605                let bytes =
606                    view_protocol::prepare_signing_bytes(&self.state.chain_id_hash, certificate);
607                if !self.verifier.verify(&leader.public_key, &bytes, signature) {
608                    warn!(view = %certificate.view, "invalid prepare signature");
609                    return false;
610                }
611                // Also verify the QC's aggregate signature and quorum
612                let qc_bytes = Vote::signing_bytes(
613                    &self.state.chain_id_hash,
614                    certificate.view,
615                    &certificate.block_hash,
616                    VoteType::Vote,
617                );
618                if !self
619                    .verifier
620                    .verify_aggregate(vs, &qc_bytes, &certificate.aggregate_signature)
621                {
622                    warn!(view = %certificate.view, "invalid QC aggregate signature");
623                    return false;
624                }
625                if !hotmint_crypto::has_quorum(vs, &certificate.aggregate_signature) {
626                    warn!(view = %certificate.view, "Prepare QC below quorum threshold");
627                    return false;
628                }
629                true
630            }
631            ConsensusMessage::Wish {
632                target_view,
633                validator,
634                highest_qc,
635                signature,
636            } => {
637                let Some(vi) = vs.get(*validator) else {
638                    warn!(validator = %validator, "wish from unknown validator");
639                    return false;
640                };
641                // Signing bytes bind both target_view and highest_qc to prevent replay.
642                let bytes = crate::pacemaker::wish_signing_bytes(
643                    &self.state.chain_id_hash,
644                    *target_view,
645                    highest_qc.as_ref(),
646                );
647                if !self.verifier.verify(&vi.public_key, &bytes, signature) {
648                    warn!(validator = %validator, "invalid wish signature");
649                    return false;
650                }
651                true
652            }
653            ConsensusMessage::TimeoutCert(tc) => {
654                // The TC's aggregate signature is a collection of individual Ed25519 signatures,
655                // each signed over wish_signing_bytes(target_view, signer_highest_qc).
656                // Because each validator may have a different highest_qc, we verify per-signer
657                // using tc.highest_qcs[i] (indexed by validator slot).
658                // This also enforces quorum: we sum voting power of verified signers.
659                let target_view = ViewNumber(tc.view.as_u64() + 1);
660                let n = vs.validator_count();
661                if tc.aggregate_signature.signers.len() != n {
662                    warn!(view = %tc.view, "TC signers bitfield length mismatch");
663                    return false;
664                }
665                let mut sig_idx = 0usize;
666                let mut power = 0u64;
667                for (i, &signed) in tc.aggregate_signature.signers.iter().enumerate() {
668                    if !signed {
669                        continue;
670                    }
671                    let Some(vi) = vs.validators().get(i) else {
672                        warn!(view = %tc.view, validator_idx = i, "TC signer index out of validator set");
673                        return false;
674                    };
675                    let hqc = tc.highest_qcs.get(i).and_then(|h| h.as_ref());
676                    let bytes = crate::pacemaker::wish_signing_bytes(
677                        &self.state.chain_id_hash,
678                        target_view,
679                        hqc,
680                    );
681                    if sig_idx >= tc.aggregate_signature.signatures.len() {
682                        warn!(view = %tc.view, "TC aggregate_signature has fewer sigs than signers");
683                        return false;
684                    }
685                    if !self.verifier.verify(
686                        &vi.public_key,
687                        &bytes,
688                        &tc.aggregate_signature.signatures[sig_idx],
689                    ) {
690                        warn!(view = %tc.view, validator = %vi.id, "TC signer signature invalid");
691                        return false;
692                    }
693                    power += vs.power_of(vi.id);
694                    sig_idx += 1;
695                }
696                if sig_idx != tc.aggregate_signature.signatures.len() {
697                    warn!(view = %tc.view, "TC has extra signatures beyond bitfield");
698                    return false;
699                }
700                if power < vs.quorum_threshold() {
701                    warn!(view = %tc.view, power, threshold = vs.quorum_threshold(), "TC insufficient quorum");
702                    return false;
703                }
704                true
705            }
706            ConsensusMessage::StatusCert {
707                locked_qc,
708                validator,
709                signature,
710            } => {
711                let Some(vi) = vs.get(*validator) else {
712                    warn!(validator = %validator, "status from unknown validator");
713                    return false;
714                };
715                let bytes = view_protocol::status_signing_bytes(
716                    &self.state.chain_id_hash,
717                    self.state.current_view,
718                    locked_qc,
719                );
720                if !self.verifier.verify(&vi.public_key, &bytes, signature) {
721                    warn!(validator = %validator, "invalid status signature");
722                    return false;
723                }
724                true
725            }
726        }
727    }
728
729    async fn handle_message(
730        &mut self,
731        _sender: Option<ValidatorId>,
732        msg: ConsensusMessage,
733    ) -> Result<()> {
734        if !self.verify_message(&msg) {
735            return Ok(());
736        }
737
738        match msg {
739            ConsensusMessage::Propose {
740                block,
741                justify,
742                double_cert,
743                signature: _,
744            } => {
745                let block = *block;
746                let justify = *justify;
747                let double_cert = double_cert.map(|dc| *dc);
748
749                // If proposal is from a future view, advance to it first
750                if block.view > self.state.current_view {
751                    if let Some(ref dc) = double_cert {
752                        if !self.validate_double_cert(dc) {
753                            return Ok(());
754                        }
755
756                        // Fast-forward via double cert
757                        self.apply_commit(dc, "fast-forward").await;
758                        self.state.highest_double_cert = Some(dc.clone());
759                        self.advance_view_to(block.view, ViewEntryTrigger::DoubleCert(dc.clone()))
760                            .await;
761                    } else {
762                        return Ok(());
763                    }
764                } else if block.view < self.state.current_view {
765                    // Still store blocks from past views if we haven't committed
766                    // that height yet. This handles the case where fast-forward
767                    // advanced our view but we missed storing the block from the
768                    // earlier proposal. Without this, chain commits that walk
769                    // the parent chain would fail with "block not found".
770                    if block.height > self.state.last_committed_height {
771                        // Verify block hash before storing past-view blocks
772                        let expected = hotmint_crypto::compute_block_hash(&block);
773                        if block.hash == expected {
774                            let mut store = self.store.write().await;
775                            store.put_block(block);
776                        }
777                    }
778                    return Ok(());
779                }
780
781                let mut store = self.store.write().await;
782
783                // R-25: verify any DoubleCert in the same-view proposal path.
784                // The future-view path already calls validate_double_cert; the same-view path
785                // passes the DC straight to on_proposal → try_commit without verification.
786                // A Byzantine leader could inject a forged DC to trigger incorrect commits.
787                if let Some(ref dc) = double_cert
788                    && !self.validate_double_cert(dc)
789                {
790                    return Ok(());
791                }
792
793                // R-28: persist justify QC as commit evidence for the block it certifies.
794                // When blocks are committed via the 2-chain rule (possibly multiple blocks at
795                // once), the innermost block gets its own commit QC, but ancestor blocks only
796                // get the chain-rule commit and have no stored QC.  Storing the justify QC here
797                // ensures that sync responders can later serve those ancestor blocks with proof.
798                if justify.aggregate_signature.count() > 0
799                    && let Some(justified_block) = store.get_block(&justify.block_hash)
800                    && store.get_commit_qc(justified_block.height).is_none()
801                {
802                    store.put_commit_qc(justified_block.height, justify.clone());
803                }
804
805                let maybe_pending = view_protocol::on_proposal(
806                    &mut self.state,
807                    view_protocol::ProposalData {
808                        block,
809                        justify,
810                        double_cert,
811                    },
812                    store.as_mut(),
813                    self.network.as_ref(),
814                    self.app.as_ref(),
815                    self.signer.as_ref(),
816                )
817                .c(d!())?;
818                drop(store);
819
820                if let Some(epoch) = maybe_pending {
821                    self.pending_epoch = Some(epoch);
822                }
823            }
824
825            ConsensusMessage::VoteMsg(vote) => {
826                if vote.view != self.state.current_view {
827                    return Ok(());
828                }
829                if !self.state.is_leader() {
830                    return Ok(());
831                }
832                if vote.vote_type != VoteType::Vote {
833                    return Ok(());
834                }
835
836                let result = self
837                    .vote_collector
838                    .add_vote(&self.state.validator_set, vote)
839                    .c(d!())?;
840                self.handle_equivocation(&result);
841                if let Some(qc) = result.qc {
842                    self.on_qc_formed(qc).await;
843                }
844            }
845
846            ConsensusMessage::Prepare {
847                certificate,
848                signature: _,
849            } => {
850                if certificate.view < self.state.current_view {
851                    return Ok(());
852                }
853                if certificate.view == self.state.current_view {
854                    // Validate the Prepare's block app_hash if we have the block in
855                    // store. Prevents locking onto a block whose app_hash diverges from
856                    // our local state. When the block is absent (node caught up via TC),
857                    // we defer to the QC's 2f+1 signatures for safety.
858                    if self.app.tracks_app_hash() {
859                        let store = self.store.read().await;
860                        if let Some(block) = store.get_block(&certificate.block_hash)
861                            && block.app_hash != self.state.last_app_hash
862                        {
863                            warn!(
864                                block_app_hash = %block.app_hash,
865                                local_app_hash = %self.state.last_app_hash,
866                                "prepare block app_hash mismatch, ignoring"
867                            );
868                            return Ok(());
869                        }
870                    }
871                    view_protocol::on_prepare(
872                        &mut self.state,
873                        certificate,
874                        self.network.as_ref(),
875                        self.signer.as_ref(),
876                    );
877                }
878            }
879
880            ConsensusMessage::Vote2Msg(vote) => {
881                if vote.view != self.state.current_view {
882                    return Ok(());
883                }
884                let result = self
885                    .vote_collector
886                    .add_vote(&self.state.validator_set, vote)
887                    .c(d!())?;
888                self.handle_equivocation(&result);
889                if let Some(outer_qc) = result.qc {
890                    self.on_double_cert_formed(outer_qc).await;
891                }
892            }
893
894            ConsensusMessage::Wish {
895                target_view,
896                validator,
897                highest_qc,
898                signature,
899            } => {
900                // Validate carried highest_qc (C4 mitigation).
901                // Both signature authenticity and 2f+1 quorum weight must pass.
902                if let Some(ref qc) = highest_qc
903                    && qc.aggregate_signature.count() > 0
904                {
905                    let qc_bytes = Vote::signing_bytes(
906                        &self.state.chain_id_hash,
907                        qc.view,
908                        &qc.block_hash,
909                        VoteType::Vote,
910                    );
911                    if !self.verifier.verify_aggregate(
912                        &self.state.validator_set,
913                        &qc_bytes,
914                        &qc.aggregate_signature,
915                    ) {
916                        warn!(validator = %validator, "wish carries invalid highest_qc signature");
917                        return Ok(());
918                    }
919                    if !hotmint_crypto::has_quorum(
920                        &self.state.validator_set,
921                        &qc.aggregate_signature,
922                    ) {
923                        warn!(validator = %validator, "wish carries highest_qc without quorum");
924                        return Ok(());
925                    }
926                }
927
928                if let Some(tc) = self.pacemaker.add_wish(
929                    &self.state.validator_set,
930                    target_view,
931                    validator,
932                    highest_qc,
933                    signature,
934                ) {
935                    info!(
936                        validator = %self.state.validator_id,
937                        view = %tc.view,
938                        "TC formed, advancing view"
939                    );
940                    self.network
941                        .broadcast(ConsensusMessage::TimeoutCert(tc.clone()));
942                    self.advance_view(ViewEntryTrigger::TimeoutCert(tc)).await;
943                }
944            }
945
946            ConsensusMessage::TimeoutCert(tc) => {
947                if self.pacemaker.should_relay_tc(&tc) {
948                    self.network
949                        .broadcast(ConsensusMessage::TimeoutCert(tc.clone()));
950                }
951                let new_view = ViewNumber(tc.view.as_u64() + 1);
952                if new_view > self.state.current_view {
953                    self.advance_view(ViewEntryTrigger::TimeoutCert(tc)).await;
954                }
955            }
956
957            ConsensusMessage::StatusCert {
958                locked_qc,
959                validator,
960                signature: _,
961            } => {
962                if self.state.is_leader() && self.state.step == ViewStep::WaitingForStatus {
963                    if let Some(ref qc) = locked_qc {
964                        self.state.update_highest_qc(qc);
965                    }
966                    self.status_senders.insert(validator);
967                    let status_power: u64 = self
968                        .status_senders
969                        .iter()
970                        .map(|v| self.state.validator_set.power_of(*v))
971                        .sum();
972                    // Leader's own power counts toward quorum
973                    let total_power =
974                        status_power + self.state.validator_set.power_of(self.state.validator_id);
975                    if total_power >= self.state.validator_set.quorum_threshold() {
976                        self.try_propose().await;
977                    }
978                }
979            }
980        }
981        Ok(())
982    }
983
984    fn handle_equivocation(&self, result: &crate::vote_collector::VoteResult) {
985        if let Some(ref proof) = result.equivocation {
986            warn!(
987                validator = %proof.validator,
988                view = %proof.view,
989                "equivocation detected!"
990            );
991            if let Err(e) = self.app.on_evidence(proof) {
992                warn!(error = %e, "on_evidence callback failed");
993            }
994        }
995    }
996
997    async fn on_qc_formed(&mut self, qc: QuorumCertificate) {
998        // Save the QC so we can reliably pair it when forming a DoubleCert
999        self.current_view_qc = Some(qc.clone());
1000
1001        view_protocol::on_votes_collected(
1002            &mut self.state,
1003            qc.clone(),
1004            self.network.as_ref(),
1005            self.signer.as_ref(),
1006        );
1007
1008        // Leader also does vote2 for its own prepare (self-vote for step 5)
1009        let vote_bytes = Vote::signing_bytes(
1010            &self.state.chain_id_hash,
1011            self.state.current_view,
1012            &qc.block_hash,
1013            VoteType::Vote2,
1014        );
1015        let signature = self.signer.sign(&vote_bytes);
1016        let vote = Vote {
1017            block_hash: qc.block_hash,
1018            view: self.state.current_view,
1019            validator: self.state.validator_id,
1020            signature,
1021            vote_type: VoteType::Vote2,
1022        };
1023
1024        // Lock on this QC
1025        self.state.update_locked_qc(&qc);
1026
1027        let next_leader_id =
1028            leader::next_leader(&self.state.validator_set, self.state.current_view);
1029        if next_leader_id == self.state.validator_id {
1030            // We are the next leader, collect vote2 locally
1031            match self
1032                .vote_collector
1033                .add_vote(&self.state.validator_set, vote)
1034            {
1035                Ok(result) => {
1036                    self.handle_equivocation(&result);
1037                    if let Some(outer_qc) = result.qc {
1038                        self.on_double_cert_formed(outer_qc).await;
1039                    }
1040                }
1041                Err(e) => warn!(error = %e, "failed to add self vote2"),
1042            }
1043        } else {
1044            self.network
1045                .send_to(next_leader_id, ConsensusMessage::Vote2Msg(vote));
1046        }
1047    }
1048
1049    async fn on_double_cert_formed(&mut self, outer_qc: QuorumCertificate) {
1050        // Use the QC we explicitly saved from this view's first voting round
1051        let inner_qc = match self.current_view_qc.take() {
1052            Some(qc) if qc.block_hash == outer_qc.block_hash => qc,
1053            _ => {
1054                // Fallback to locked_qc or highest_qc
1055                match &self.state.locked_qc {
1056                    Some(qc) if qc.block_hash == outer_qc.block_hash => qc.clone(),
1057                    _ => match &self.state.highest_qc {
1058                        Some(qc) if qc.block_hash == outer_qc.block_hash => qc.clone(),
1059                        _ => {
1060                            warn!(
1061                                validator = %self.state.validator_id,
1062                                "double cert formed but can't find matching inner QC"
1063                            );
1064                            return;
1065                        }
1066                    },
1067                }
1068            }
1069        };
1070
1071        let dc = DoubleCertificate { inner_qc, outer_qc };
1072
1073        info!(
1074            validator = %self.state.validator_id,
1075            view = %self.state.current_view,
1076            hash = %dc.inner_qc.block_hash,
1077            "double certificate formed, committing"
1078        );
1079
1080        // Commit
1081        self.apply_commit(&dc, "double-cert").await;
1082
1083        self.state.highest_double_cert = Some(dc.clone());
1084
1085        // Advance to next view — as new leader, include DC in proposal
1086        self.advance_view(ViewEntryTrigger::DoubleCert(dc)).await;
1087    }
1088
1089    async fn handle_timeout(&mut self) {
1090        // Skip wish building/signing entirely when we have no voting power (fullnodes).
1091        // build_wish involves a cryptographic signing operation that serves no purpose
1092        // when the wish will never be broadcast or counted toward a TC.
1093        let has_power = self.state.validator_set.power_of(self.state.validator_id) > 0;
1094        if !has_power {
1095            self.pacemaker.on_timeout();
1096            return;
1097        }
1098
1099        info!(
1100            validator = %self.state.validator_id,
1101            view = %self.state.current_view,
1102            "view timeout, sending wish"
1103        );
1104
1105        let wish = self.pacemaker.build_wish(
1106            &self.state.chain_id_hash,
1107            self.state.current_view,
1108            self.state.validator_id,
1109            self.state.highest_qc.clone(),
1110            self.signer.as_ref(),
1111        );
1112
1113        self.network.broadcast(wish.clone());
1114
1115        // Also process our own wish
1116        if let ConsensusMessage::Wish {
1117            target_view,
1118            validator,
1119            highest_qc,
1120            signature,
1121        } = wish
1122            && let Some(tc) = self.pacemaker.add_wish(
1123                &self.state.validator_set,
1124                target_view,
1125                validator,
1126                highest_qc,
1127                signature,
1128            )
1129        {
1130            self.network
1131                .broadcast(ConsensusMessage::TimeoutCert(tc.clone()));
1132            self.advance_view(ViewEntryTrigger::TimeoutCert(tc)).await;
1133            return;
1134        }
1135
1136        // Exponential backoff on repeated timeouts
1137        self.pacemaker.on_timeout();
1138    }
1139
1140    /// Apply the result of a successful try_commit: update app_hash, pending epoch,
1141    /// store commit QCs, and flush. Called from both normal and fast-forward commit paths.
1142    async fn apply_commit(&mut self, dc: &DoubleCertificate, context: &str) {
1143        let store = self.store.read().await;
1144        match try_commit(
1145            dc,
1146            store.as_ref(),
1147            self.app.as_ref(),
1148            &mut self.state.last_committed_height,
1149            &self.state.current_epoch,
1150        ) {
1151            Ok(result) => {
1152                if !result.committed_blocks.is_empty() {
1153                    self.state.last_app_hash = result.last_app_hash;
1154                }
1155                if result.pending_epoch.is_some() {
1156                    self.pending_epoch = result.pending_epoch;
1157                }
1158                drop(store);
1159                {
1160                    let mut s = self.store.write().await;
1161                    for block in &result.committed_blocks {
1162                        // R-28: only write the commit QC for the block it actually certifies.
1163                        // Ancestor blocks committed via the chain rule may get their QC stored
1164                        // by the justify-QC persistence in handle_message (Propose path).
1165                        // Writing the wrong QC (mismatched block_hash) here would cause sync
1166                        // verification failures for those ancestor blocks.
1167                        if result.commit_qc.block_hash == block.hash {
1168                            s.put_commit_qc(block.height, result.commit_qc.clone());
1169                        }
1170                    }
1171                    s.flush();
1172                }
1173            }
1174            Err(e) => {
1175                warn!(error = %e, "try_commit failed during {context}");
1176                drop(store);
1177            }
1178        }
1179    }
1180
1181    /// Cryptographically validate a DoubleCertificate:
1182    /// 1. inner and outer QC must reference the same block hash
1183    /// 2. inner QC aggregate signature (Vote1) must be valid and reach quorum
1184    /// 3. outer QC aggregate signature (Vote2) must be valid and reach quorum
1185    ///
1186    /// Note on quorum and epoch transitions: DCs are always formed in the same epoch as
1187    /// the block they commit (vote_collector enforces quorum at formation time).  When
1188    /// a DC is received by a node that has already transitioned to a new epoch, the
1189    /// validator set may differ.  We enforce quorum against the current validator set
1190    /// as the best available reference; a legitimate DC from a prior epoch should still
1191    /// satisfy quorum against the new set unless the set shrank significantly.
1192    fn validate_double_cert(&self, dc: &DoubleCertificate) -> bool {
1193        if dc.inner_qc.block_hash != dc.outer_qc.block_hash {
1194            warn!("double cert inner/outer block_hash mismatch");
1195            return false;
1196        }
1197        let vs = &self.state.validator_set;
1198        let inner_bytes = Vote::signing_bytes(
1199            &self.state.chain_id_hash,
1200            dc.inner_qc.view,
1201            &dc.inner_qc.block_hash,
1202            VoteType::Vote,
1203        );
1204        if !self
1205            .verifier
1206            .verify_aggregate(vs, &inner_bytes, &dc.inner_qc.aggregate_signature)
1207        {
1208            warn!("double cert inner QC signature invalid");
1209            return false;
1210        }
1211        if !hotmint_crypto::has_quorum(vs, &dc.inner_qc.aggregate_signature) {
1212            warn!("double cert inner QC below quorum threshold");
1213            return false;
1214        }
1215        let outer_bytes = Vote::signing_bytes(
1216            &self.state.chain_id_hash,
1217            dc.outer_qc.view,
1218            &dc.outer_qc.block_hash,
1219            VoteType::Vote2,
1220        );
1221        if !self
1222            .verifier
1223            .verify_aggregate(vs, &outer_bytes, &dc.outer_qc.aggregate_signature)
1224        {
1225            warn!("double cert outer QC signature invalid");
1226            return false;
1227        }
1228        if !hotmint_crypto::has_quorum(vs, &dc.outer_qc.aggregate_signature) {
1229            warn!("double cert outer QC below quorum threshold");
1230            return false;
1231        }
1232        true
1233    }
1234
1235    fn persist_state(&mut self) {
1236        if let Some(p) = self.persistence.as_mut() {
1237            p.save_current_view(self.state.current_view);
1238            if let Some(ref qc) = self.state.locked_qc {
1239                p.save_locked_qc(qc);
1240            }
1241            if let Some(ref qc) = self.state.highest_qc {
1242                p.save_highest_qc(qc);
1243            }
1244            p.save_last_committed_height(self.state.last_committed_height);
1245            p.save_current_epoch(&self.state.current_epoch);
1246            p.save_last_app_hash(self.state.last_app_hash);
1247            p.flush();
1248        }
1249    }
1250
1251    async fn advance_view(&mut self, trigger: ViewEntryTrigger) {
1252        let new_view = match &trigger {
1253            ViewEntryTrigger::DoubleCert(_) => self.state.current_view.next(),
1254            ViewEntryTrigger::TimeoutCert(tc) => ViewNumber(tc.view.as_u64() + 1),
1255            ViewEntryTrigger::Genesis => ViewNumber(1),
1256        };
1257        self.advance_view_to(new_view, trigger).await;
1258    }
1259
1260    async fn advance_view_to(&mut self, new_view: ViewNumber, trigger: ViewEntryTrigger) {
1261        if new_view <= self.state.current_view {
1262            return;
1263        }
1264
1265        // Reset backoff on successful progress (DoubleCert path)
1266        let is_progress = matches!(&trigger, ViewEntryTrigger::DoubleCert(_));
1267
1268        self.vote_collector.clear_view(self.state.current_view);
1269        self.vote_collector.prune_before(self.state.current_view);
1270        self.pacemaker.clear_view(self.state.current_view);
1271        self.pacemaker.prune_before(self.state.current_view);
1272        self.status_senders.clear();
1273        self.current_view_qc = None;
1274
1275        // Epoch transition: apply pending validator set change when we reach the
1276        // epoch's start_view. The start_view is set deterministically (commit_view + 2)
1277        // so all honest nodes apply the transition at the same view.
1278        if let Some(ref epoch) = self.pending_epoch
1279            && new_view >= epoch.start_view
1280        {
1281            let new_epoch = self.pending_epoch.take().unwrap();
1282            info!(
1283                validator = %self.state.validator_id,
1284                old_epoch = %self.state.current_epoch.number,
1285                new_epoch = %new_epoch.number,
1286                start_view = %new_epoch.start_view,
1287                validators = new_epoch.validator_set.validator_count(),
1288                "epoch transition"
1289            );
1290            self.state.validator_set = new_epoch.validator_set.clone();
1291            self.state.current_epoch = new_epoch;
1292            // Notify network layer of the new validator set
1293            self.network.on_epoch_change(&self.state.validator_set);
1294            // Full clear: old votes/wishes are from the previous epoch's validator set
1295            self.vote_collector = VoteCollector::new();
1296            self.pacemaker = Pacemaker::with_config(self.pacemaker_config.clone());
1297        }
1298
1299        view_protocol::enter_view(
1300            &mut self.state,
1301            new_view,
1302            trigger,
1303            self.network.as_ref(),
1304            self.signer.as_ref(),
1305        );
1306
1307        if is_progress {
1308            self.pacemaker.reset_on_progress();
1309        } else {
1310            self.pacemaker.reset_timer();
1311        }
1312
1313        self.persist_state();
1314
1315        // If we're the leader, propose immediately.
1316        // Note: in a full implementation, the leader would collect StatusCerts
1317        // before proposing (status_senders quorum gate). Currently the immediate
1318        // propose path is required for liveness across epoch transitions where
1319        // cross-epoch verification complexity can stall status collection.
1320        if self.state.is_leader() && self.state.step == ViewStep::WaitingForStatus {
1321            self.try_propose().await;
1322        }
1323    }
1324}
1325
1326// ---------------------------------------------------------------------------
1327// Regression tests for sub-quorum certificate injection (R-29, R-32)
1328// ---------------------------------------------------------------------------
1329
1330#[cfg(test)]
1331mod tests {
1332    use super::*;
1333
1334    use std::sync::Arc;
1335
1336    use tokio::sync::RwLock;
1337
1338    use hotmint_crypto::{Ed25519Signer, Ed25519Verifier};
1339    use hotmint_types::Signer as SignerTrait;
1340    use hotmint_types::certificate::QuorumCertificate;
1341    use hotmint_types::crypto::AggregateSignature;
1342    use hotmint_types::validator::{ValidatorId, ValidatorInfo};
1343    use hotmint_types::vote::{Vote, VoteType};
1344    use tokio::sync::mpsc;
1345
1346    use crate::application::NoopApplication;
1347    use crate::network::NetworkSink;
1348    use crate::state::ConsensusState;
1349    use crate::store::MemoryBlockStore;
1350
1351    // Minimal no-op network for unit tests — messages are silently discarded.
1352    struct DevNullNetwork;
1353    impl NetworkSink for DevNullNetwork {
1354        fn broadcast(&self, _: ConsensusMessage) {}
1355        fn send_to(&self, _: ValidatorId, _: ConsensusMessage) {}
1356    }
1357
1358    /// Default chain_id_hash for tests — matches ConsensusState::new() which uses chain_id = "".
1359    fn test_chain_id_hash() -> [u8; 32] {
1360        *blake3::hash(b"").as_bytes()
1361    }
1362
1363    fn make_validator_set_4() -> (ValidatorSet, Vec<Ed25519Signer>) {
1364        let signers: Vec<Ed25519Signer> = (0..4)
1365            .map(|i| Ed25519Signer::generate(ValidatorId(i)))
1366            .collect();
1367        let infos: Vec<ValidatorInfo> = signers
1368            .iter()
1369            .map(|s| ValidatorInfo {
1370                id: s.validator_id(),
1371                public_key: s.public_key(),
1372                power: 1,
1373            })
1374            .collect();
1375        (ValidatorSet::new(infos), signers)
1376    }
1377
1378    fn make_test_engine(
1379        vid: ValidatorId,
1380        vs: ValidatorSet,
1381        signer: Ed25519Signer,
1382    ) -> (
1383        ConsensusEngine,
1384        mpsc::Sender<(Option<ValidatorId>, ConsensusMessage)>,
1385    ) {
1386        let (tx, rx) = mpsc::channel(64);
1387        let store = Arc::new(RwLock::new(
1388            Box::new(MemoryBlockStore::new()) as Box<dyn crate::store::BlockStore>
1389        ));
1390        let state = ConsensusState::new(vid, vs);
1391        let engine = ConsensusEngine::new(
1392            state,
1393            store,
1394            Box::new(DevNullNetwork),
1395            Box::new(NoopApplication),
1396            Box::new(signer),
1397            rx,
1398            EngineConfig {
1399                verifier: Box::new(Ed25519Verifier),
1400                pacemaker: None,
1401                persistence: None,
1402            },
1403        );
1404        (engine, tx)
1405    }
1406
1407    // R-29 regression: a Propose message whose justify QC is signed by fewer than
1408    // 2f+1 validators must be rejected by verify_message().
1409    #[test]
1410    fn r29_propose_sub_quorum_justify_rejected_by_verify_message() {
1411        let (vs, signers) = make_validator_set_4();
1412        // Use a fresh signer for the engine; verify_message only needs the engine's
1413        // validator set and verifier, not its own signing key.
1414        let engine_signer = Ed25519Signer::generate(ValidatorId(0));
1415        let (engine, _tx) = make_test_engine(ValidatorId(0), vs.clone(), engine_signer);
1416
1417        // Build a justify QC signed by exactly 1 of 4 validators — below 2f+1 = 3.
1418        let chain_id_hash = test_chain_id_hash();
1419        let hash = BlockHash::GENESIS;
1420        let qc_view = ViewNumber::GENESIS;
1421        let vote_bytes = Vote::signing_bytes(&chain_id_hash, qc_view, &hash, VoteType::Vote);
1422        let mut agg = AggregateSignature::new(4);
1423        agg.add(1, SignerTrait::sign(&signers[1], &vote_bytes))
1424            .unwrap();
1425        let sub_quorum_qc = QuorumCertificate {
1426            block_hash: hash,
1427            view: qc_view,
1428            aggregate_signature: agg,
1429        };
1430
1431        // Construct a proposal from V1 carrying this sub-quorum justify.
1432        let mut block = Block::genesis();
1433        block.height = Height(1);
1434        block.view = ViewNumber(1);
1435        block.proposer = ValidatorId(1);
1436        block.hash = block.compute_hash();
1437        let proposal_bytes =
1438            crate::view_protocol::proposal_signing_bytes(&chain_id_hash, &block, &sub_quorum_qc);
1439        let signature = SignerTrait::sign(&signers[1], &proposal_bytes);
1440
1441        let msg = ConsensusMessage::Propose {
1442            block: Box::new(block),
1443            justify: Box::new(sub_quorum_qc),
1444            double_cert: None,
1445            signature,
1446        };
1447
1448        assert!(
1449            !engine.verify_message(&msg),
1450            "R-29 regression: Propose with sub-quorum justify QC must be rejected by verify_message"
1451        );
1452    }
1453
1454    // R-29 regression: a Propose message with a full quorum justify QC (3/4) must pass.
1455    #[test]
1456    fn r29_propose_full_quorum_justify_accepted_by_verify_message() {
1457        let (vs, signers) = make_validator_set_4();
1458        let engine_signer = Ed25519Signer::generate(ValidatorId(0));
1459        let (engine, _tx) = make_test_engine(ValidatorId(0), vs.clone(), engine_signer);
1460
1461        let chain_id_hash = test_chain_id_hash();
1462        let hash = BlockHash::GENESIS;
1463        let qc_view = ViewNumber::GENESIS;
1464        let vote_bytes = Vote::signing_bytes(&chain_id_hash, qc_view, &hash, VoteType::Vote);
1465        // 3 of 4 signers — meets 2f+1 threshold.
1466        let mut agg = AggregateSignature::new(4);
1467        for (i, signer) in signers.iter().take(3).enumerate() {
1468            agg.add(i, SignerTrait::sign(signer, &vote_bytes)).unwrap();
1469        }
1470        let full_quorum_qc = QuorumCertificate {
1471            block_hash: hash,
1472            view: qc_view,
1473            aggregate_signature: agg,
1474        };
1475
1476        let mut block = Block::genesis();
1477        block.height = Height(1);
1478        block.view = ViewNumber(1);
1479        block.proposer = ValidatorId(1);
1480        block.hash = block.compute_hash();
1481        let proposal_bytes =
1482            crate::view_protocol::proposal_signing_bytes(&chain_id_hash, &block, &full_quorum_qc);
1483        let signature = SignerTrait::sign(&signers[1], &proposal_bytes);
1484
1485        let msg = ConsensusMessage::Propose {
1486            block: Box::new(block),
1487            justify: Box::new(full_quorum_qc),
1488            double_cert: None,
1489            signature,
1490        };
1491
1492        assert!(
1493            engine.verify_message(&msg),
1494            "R-29: Propose with full quorum justify QC must pass verify_message"
1495        );
1496    }
1497
1498    // R-32 regression: a Wish carrying a sub-quorum highest_qc must cause
1499    // verify_highest_qc_in_wish to treat the QC as invalid and return false,
1500    // which causes handle_message to discard the Wish without forwarding it
1501    // to the pacemaker.
1502    //
1503    // We verify the sub-component: has_quorum returns false for a 1-of-4 aggregate,
1504    // ensuring the guard in handle_message fires.
1505    #[test]
1506    fn r32_sub_quorum_highest_qc_fails_has_quorum() {
1507        let (vs, signers) = make_validator_set_4();
1508
1509        let chain_id_hash = test_chain_id_hash();
1510        let hash = BlockHash([1u8; 32]);
1511        let qc_view = ViewNumber(1);
1512        let vote_bytes = Vote::signing_bytes(&chain_id_hash, qc_view, &hash, VoteType::Vote);
1513
1514        // Build a QC with only 1 signer — sub-quorum.
1515        let mut agg = AggregateSignature::new(4);
1516        agg.add(0, SignerTrait::sign(&signers[0], &vote_bytes))
1517            .unwrap();
1518
1519        assert!(
1520            !hotmint_crypto::has_quorum(&vs, &agg),
1521            "R-32 regression: 1-of-4 signed QC must not satisfy has_quorum"
1522        );
1523
1524        // Build a QC with 3 signers — full quorum.
1525        let mut agg_full = AggregateSignature::new(4);
1526        for (i, signer) in signers.iter().take(3).enumerate() {
1527            agg_full
1528                .add(i, SignerTrait::sign(signer, &vote_bytes))
1529                .unwrap();
1530        }
1531        assert!(
1532            hotmint_crypto::has_quorum(&vs, &agg_full),
1533            "R-32: 3-of-4 signed QC must satisfy has_quorum"
1534        );
1535    }
1536}