1use ruc::*;
2
3use std::collections::{HashMap, HashSet};
4use std::sync::Arc;
5
6use tokio::sync::RwLock;
7
8use crate::application::Application;
9use crate::commit::try_commit;
10use crate::evidence_store::EvidenceStore;
11use crate::leader;
12use crate::network::NetworkSink;
13use crate::pacemaker::{Pacemaker, PacemakerConfig};
14use crate::state::{ConsensusState, ViewStep};
15use crate::store::BlockStore;
16use crate::view_protocol::{self, ViewEntryTrigger};
17use crate::vote_collector::VoteCollector;
18
19use hotmint_types::epoch::Epoch;
20use hotmint_types::vote::VoteType;
21use hotmint_types::*;
22use tokio::sync::mpsc;
23use tracing::{info, warn};
24
25pub type SharedBlockStore = Arc<RwLock<Box<dyn BlockStore>>>;
27
28pub trait StatePersistence: Send {
30 fn save_current_view(&mut self, view: ViewNumber);
31 fn save_locked_qc(&mut self, qc: &QuorumCertificate);
32 fn save_highest_qc(&mut self, qc: &QuorumCertificate);
33 fn save_last_committed_height(&mut self, height: Height);
34 fn save_current_epoch(&mut self, epoch: &Epoch);
35 fn save_last_app_hash(&mut self, hash: BlockHash);
36 fn flush(&self);
37}
38
39pub struct ConsensusEngine {
40 state: ConsensusState,
41 store: SharedBlockStore,
42 network: Box<dyn NetworkSink>,
43 app: Box<dyn Application>,
44 signer: Box<dyn Signer>,
45 verifier: Box<dyn Verifier>,
46 vote_collector: VoteCollector,
47 pacemaker: Pacemaker,
48 pacemaker_config: PacemakerConfig,
49 msg_rx: mpsc::Receiver<(Option<ValidatorId>, ConsensusMessage)>,
50 status_senders: HashSet<ValidatorId>,
52 current_view_qc: Option<QuorumCertificate>,
54 pending_epoch: Option<Epoch>,
56 persistence: Option<Box<dyn StatePersistence>>,
58 evidence_store: Option<Box<dyn EvidenceStore>>,
60}
61
62pub struct EngineConfig {
64 pub verifier: Box<dyn Verifier>,
65 pub pacemaker: Option<PacemakerConfig>,
66 pub persistence: Option<Box<dyn StatePersistence>>,
67 pub evidence_store: Option<Box<dyn EvidenceStore>>,
68}
69
70impl EngineConfig {
71 pub fn new(verifier: Box<dyn Verifier>) -> Self {
74 Self {
75 verifier,
76 pacemaker: None,
77 persistence: None,
78 evidence_store: None,
79 }
80 }
81
82 pub fn with_pacemaker(mut self, pacemaker: PacemakerConfig) -> Self {
84 self.pacemaker = Some(pacemaker);
85 self
86 }
87
88 pub fn with_persistence(mut self, persistence: Box<dyn StatePersistence>) -> Self {
90 self.persistence = Some(persistence);
91 self
92 }
93}
94
95pub struct ConsensusEngineBuilder {
111 state: Option<ConsensusState>,
112 store: Option<SharedBlockStore>,
113 network: Option<Box<dyn NetworkSink>>,
114 app: Option<Box<dyn Application>>,
115 signer: Option<Box<dyn Signer>>,
116 msg_rx: Option<mpsc::Receiver<(Option<ValidatorId>, ConsensusMessage)>>,
117 verifier: Option<Box<dyn Verifier>>,
118 pacemaker: Option<PacemakerConfig>,
119 persistence: Option<Box<dyn StatePersistence>>,
120 evidence_store: Option<Box<dyn EvidenceStore>>,
121}
122
123impl ConsensusEngineBuilder {
124 pub fn new() -> Self {
125 Self {
126 state: None,
127 store: None,
128 network: None,
129 app: None,
130 signer: None,
131 msg_rx: None,
132 verifier: None,
133 pacemaker: None,
134 persistence: None,
135 evidence_store: None,
136 }
137 }
138
139 pub fn state(mut self, state: ConsensusState) -> Self {
140 self.state = Some(state);
141 self
142 }
143
144 pub fn store(mut self, store: SharedBlockStore) -> Self {
145 self.store = Some(store);
146 self
147 }
148
149 pub fn network(mut self, network: Box<dyn NetworkSink>) -> Self {
150 self.network = Some(network);
151 self
152 }
153
154 pub fn app(mut self, app: Box<dyn Application>) -> Self {
155 self.app = Some(app);
156 self
157 }
158
159 pub fn signer(mut self, signer: Box<dyn Signer>) -> Self {
160 self.signer = Some(signer);
161 self
162 }
163
164 pub fn messages(
165 mut self,
166 msg_rx: mpsc::Receiver<(Option<ValidatorId>, ConsensusMessage)>,
167 ) -> Self {
168 self.msg_rx = Some(msg_rx);
169 self
170 }
171
172 pub fn verifier(mut self, verifier: Box<dyn Verifier>) -> Self {
173 self.verifier = Some(verifier);
174 self
175 }
176
177 pub fn pacemaker(mut self, config: PacemakerConfig) -> Self {
178 self.pacemaker = Some(config);
179 self
180 }
181
182 pub fn persistence(mut self, persistence: Box<dyn StatePersistence>) -> Self {
183 self.persistence = Some(persistence);
184 self
185 }
186
187 pub fn evidence_store(mut self, store: Box<dyn EvidenceStore>) -> Self {
188 self.evidence_store = Some(store);
189 self
190 }
191
192 pub fn build(self) -> ruc::Result<ConsensusEngine> {
193 let state = self.state.ok_or_else(|| ruc::eg!("state is required"))?;
194 let store = self.store.ok_or_else(|| ruc::eg!("store is required"))?;
195 let network = self
196 .network
197 .ok_or_else(|| ruc::eg!("network is required"))?;
198 let app = self.app.ok_or_else(|| ruc::eg!("app is required"))?;
199 let signer = self.signer.ok_or_else(|| ruc::eg!("signer is required"))?;
200 let msg_rx = self
201 .msg_rx
202 .ok_or_else(|| ruc::eg!("messages (msg_rx) is required"))?;
203 let verifier = self
204 .verifier
205 .ok_or_else(|| ruc::eg!("verifier is required"))?;
206
207 let config = EngineConfig {
208 verifier,
209 pacemaker: self.pacemaker,
210 persistence: self.persistence,
211 evidence_store: self.evidence_store,
212 };
213
214 Ok(ConsensusEngine::new(
215 state, store, network, app, signer, msg_rx, config,
216 ))
217 }
218}
219
220impl Default for ConsensusEngineBuilder {
221 fn default() -> Self {
222 Self::new()
223 }
224}
225
226impl ConsensusEngine {
227 pub fn new(
228 state: ConsensusState,
229 store: SharedBlockStore,
230 network: Box<dyn NetworkSink>,
231 app: Box<dyn Application>,
232 signer: Box<dyn Signer>,
233 msg_rx: mpsc::Receiver<(Option<ValidatorId>, ConsensusMessage)>,
234 config: EngineConfig,
235 ) -> Self {
236 let pc = config.pacemaker.unwrap_or_default();
237 Self {
238 state,
239 store,
240 network,
241 app,
242 signer,
243 verifier: config.verifier,
244 vote_collector: VoteCollector::new(),
245 pacemaker: Pacemaker::with_config(pc.clone()),
246 pacemaker_config: pc,
247 msg_rx,
248 status_senders: HashSet::new(),
249 current_view_qc: None,
250 pending_epoch: None,
251 persistence: config.persistence,
252 evidence_store: config.evidence_store,
253 }
254 }
255
256 pub async fn run(mut self) {
259 if self.state.current_view.as_u64() <= 1 {
260 self.enter_genesis_view().await;
261 } else {
262 info!(
263 validator = %self.state.validator_id,
264 view = %self.state.current_view,
265 height = %self.state.last_committed_height,
266 "resuming from persisted state"
267 );
268 self.pacemaker.reset_timer();
269 }
270
271 loop {
272 let deadline = self.pacemaker.sleep_until_deadline();
273 tokio::pin!(deadline);
274
275 tokio::select! {
276 Some((sender, msg)) = self.msg_rx.recv() => {
277 if let Err(e) = self.handle_message(sender, msg).await {
278 warn!(validator = %self.state.validator_id, error = %e, "error handling message");
279 }
280 }
281 _ = &mut deadline => {
282 self.handle_timeout().await;
283 }
284 }
285 }
286 }
287
288 async fn enter_genesis_view(&mut self) {
289 let genesis_qc = QuorumCertificate {
291 block_hash: BlockHash::GENESIS,
292 view: ViewNumber::GENESIS,
293 aggregate_signature: AggregateSignature::new(
294 self.state.validator_set.validator_count(),
295 ),
296 epoch: self.state.current_epoch.number,
297 };
298 self.state.highest_qc = Some(genesis_qc);
299
300 let view = ViewNumber(1);
301 view_protocol::enter_view(
302 &mut self.state,
303 view,
304 ViewEntryTrigger::Genesis,
305 self.network.as_ref(),
306 self.signer.as_ref(),
307 );
308 self.pacemaker.reset_timer();
309
310 if self.state.is_leader() {
312 self.state.step = ViewStep::WaitingForStatus;
313 self.try_propose().await;
315 }
316 }
317
318 fn try_propose(
319 &mut self,
320 ) -> std::pin::Pin<Box<dyn std::future::Future<Output = ()> + Send + '_>> {
321 Box::pin(async move {
322 let proposed_block = {
324 let mut store = self.store.write().await;
325 view_protocol::propose(
326 &mut self.state,
327 store.as_mut(),
328 self.network.as_ref(),
329 self.app.as_ref(),
330 self.signer.as_ref(),
331 )
332 }; match proposed_block {
335 Ok(block) => {
336 if let Some(ref store) = self.evidence_store {
338 let pending = store.get_pending();
339 if !pending.is_empty() {
340 info!(
341 validator = %self.state.validator_id,
342 count = pending.len(),
343 "pending equivocation evidence available for inclusion"
344 );
345 }
346 }
347 self.leader_self_vote(block.hash).await;
349 }
350 Err(e) => {
351 warn!(
352 validator = %self.state.validator_id,
353 error = %e,
354 "failed to propose"
355 );
356 }
357 }
358 })
359 }
360
361 async fn leader_self_vote(&mut self, block_hash: BlockHash) {
362 let vote_bytes = Vote::signing_bytes(
363 &self.state.chain_id_hash,
364 self.state.current_epoch.number,
365 self.state.current_view,
366 &block_hash,
367 VoteType::Vote,
368 );
369 let signature = self.signer.sign(&vote_bytes);
370 let vote = Vote {
371 block_hash,
372 view: self.state.current_view,
373 validator: self.state.validator_id,
374 signature,
375 vote_type: VoteType::Vote,
376 extension: None,
377 };
378 match self.vote_collector.add_vote(
379 &self.state.validator_set,
380 vote,
381 self.state.current_epoch.number,
382 ) {
383 Ok(result) => {
384 self.handle_equivocation(&result);
385 if let Some(qc) = result.qc {
386 self.on_qc_formed(qc).await;
387 }
388 }
389 Err(e) => warn!(error = %e, "failed to add self vote"),
390 }
391 }
392}
393
394pub fn verify_relay_sender(
411 sender: ValidatorId,
412 msg: &ConsensusMessage,
413 validator_keys: &HashMap<ValidatorId, hotmint_types::crypto::PublicKey>,
414 ordered_validators: &[ValidatorId],
415 chain_id_hash: &[u8; 32],
416 epoch: hotmint_types::epoch::EpochNumber,
417) -> bool {
418 use hotmint_crypto::Ed25519Verifier;
419 use hotmint_types::Verifier;
420 use hotmint_types::vote::Vote;
421 let verifier = Ed25519Verifier;
422 match msg {
423 ConsensusMessage::Propose {
424 block,
425 justify,
426 signature,
427 ..
428 } => {
429 let Some(pk) = validator_keys.get(&block.proposer) else {
430 return false;
431 };
432 let bytes =
433 crate::view_protocol::proposal_signing_bytes(chain_id_hash, epoch, block, justify);
434 Verifier::verify(&verifier, pk, &bytes, signature)
435 }
436 ConsensusMessage::VoteMsg(vote) | ConsensusMessage::Vote2Msg(vote) => {
437 let Some(pk) = validator_keys.get(&vote.validator) else {
438 return false;
439 };
440 let bytes = Vote::signing_bytes(
441 chain_id_hash,
442 epoch,
443 vote.view,
444 &vote.block_hash,
445 vote.vote_type,
446 );
447 Verifier::verify(&verifier, pk, &bytes, &vote.signature)
448 }
449 ConsensusMessage::Prepare {
450 certificate,
451 signature,
452 } => {
453 if !ordered_validators.is_empty() {
456 let n = ordered_validators.len();
457 let expected_leader = ordered_validators[certificate.view.as_u64() as usize % n];
458 if sender != expected_leader {
459 return false;
460 }
461 }
462 let Some(pk) = validator_keys.get(&sender) else {
463 return false;
464 };
465 let bytes =
466 crate::view_protocol::prepare_signing_bytes(chain_id_hash, epoch, certificate);
467 Verifier::verify(&verifier, pk, &bytes, signature)
468 }
469 ConsensusMessage::Wish {
470 target_view,
471 validator,
472 highest_qc,
473 signature,
474 } => {
475 let Some(pk) = validator_keys.get(validator) else {
476 return false;
477 };
478 let bytes = crate::pacemaker::wish_signing_bytes(
479 chain_id_hash,
480 epoch,
481 *target_view,
482 highest_qc.as_ref(),
483 );
484 Verifier::verify(&verifier, pk, &bytes, signature)
485 }
486 ConsensusMessage::TimeoutCert(tc) => {
487 let target_view = ViewNumber(tc.view.as_u64() + 1);
489 let n = ordered_validators.len();
490 if n == 0 || tc.aggregate_signature.signers.len() != n {
491 return false;
492 }
493 let mut sig_idx = 0usize;
494 let mut verified_count = 0usize;
495 for (i, &signed) in tc.aggregate_signature.signers.iter().enumerate() {
496 if !signed {
497 continue;
498 }
499 if i >= n {
500 return false;
501 }
502 let vid = ordered_validators[i];
503 let Some(pk) = validator_keys.get(&vid) else {
504 return false;
505 };
506 let hqc = tc.highest_qcs.get(i).and_then(|h| h.as_ref());
507 let bytes =
508 crate::pacemaker::wish_signing_bytes(chain_id_hash, epoch, target_view, hqc);
509 if sig_idx >= tc.aggregate_signature.signatures.len() {
510 return false;
511 }
512 if !Verifier::verify(
513 &verifier,
514 pk,
515 &bytes,
516 &tc.aggregate_signature.signatures[sig_idx],
517 ) {
518 return false;
519 }
520 sig_idx += 1;
521 verified_count += 1;
522 }
523 if sig_idx != tc.aggregate_signature.signatures.len() {
524 return false;
525 }
526 verified_count * 3 > n * 2
529 }
530 ConsensusMessage::StatusCert {
531 validator,
532 signature,
533 locked_qc,
534 ..
535 } => {
536 let Some(pk) = validator_keys.get(validator) else {
542 return false;
543 };
544 let _ = (pk, signature, locked_qc);
548 true
549 }
550 ConsensusMessage::Evidence(_) => {
551 validator_keys.contains_key(&sender)
554 }
555 }
556}
557
558impl ConsensusEngine {
559 fn verification_epochs(&self) -> [EpochNumber; 2] {
563 let cur = self.state.current_epoch.number;
564 let prev = if cur.as_u64() > 0 {
565 EpochNumber(cur.as_u64() - 1)
566 } else {
567 cur
568 };
569 [cur, prev]
570 }
571
572 fn verify_message(&self, msg: &ConsensusMessage) -> bool {
580 let msg_view = match msg {
585 ConsensusMessage::Propose { .. } => None, ConsensusMessage::VoteMsg(v) | ConsensusMessage::Vote2Msg(v) => Some(v.view),
587 ConsensusMessage::Prepare { certificate, .. } => Some(certificate.view),
588 ConsensusMessage::Wish { target_view, .. } => Some(*target_view),
589 ConsensusMessage::TimeoutCert(tc) => Some(ViewNumber(tc.view.as_u64() + 1)),
590 ConsensusMessage::StatusCert { .. } => None,
591 ConsensusMessage::Evidence(_) => None, };
593 if let Some(v) = msg_view
594 && v < self.state.current_view
595 {
596 return true; }
598
599 let vs = &self.state.validator_set;
600 match msg {
601 ConsensusMessage::Propose {
602 block,
603 justify,
604 signature,
605 ..
606 } => {
607 let proposer = vs.get(block.proposer);
608 let Some(vi) = proposer else {
609 warn!(proposer = %block.proposer, "propose from unknown validator");
610 return false;
611 };
612 let mut proposal_ok = false;
613 for epoch in self.verification_epochs() {
614 let bytes = view_protocol::proposal_signing_bytes(
615 &self.state.chain_id_hash,
616 epoch,
617 block,
618 justify,
619 );
620 if self.verifier.verify(&vi.public_key, &bytes, signature) {
621 proposal_ok = true;
622 break;
623 }
624 }
625 if !proposal_ok {
626 warn!(proposer = %block.proposer, "invalid proposal signature");
627 return false;
628 }
629 if justify.aggregate_signature.count() > 0 {
631 let qc_bytes = Vote::signing_bytes(
632 &self.state.chain_id_hash,
633 justify.epoch,
634 justify.view,
635 &justify.block_hash,
636 VoteType::Vote,
637 );
638 if !self
639 .verifier
640 .verify_aggregate(vs, &qc_bytes, &justify.aggregate_signature)
641 {
642 warn!(proposer = %block.proposer, "invalid justify QC aggregate signature");
643 return false;
644 }
645 if !hotmint_crypto::has_quorum(vs, &justify.aggregate_signature) {
646 warn!(proposer = %block.proposer, "justify QC below quorum threshold");
647 return false;
648 }
649 }
650 true
651 }
652 ConsensusMessage::VoteMsg(vote) | ConsensusMessage::Vote2Msg(vote) => {
653 let Some(vi) = vs.get(vote.validator) else {
654 warn!(validator = %vote.validator, "vote from unknown validator");
655 return false;
656 };
657 let mut ok = false;
658 for epoch in self.verification_epochs() {
659 let bytes = Vote::signing_bytes(
660 &self.state.chain_id_hash,
661 epoch,
662 vote.view,
663 &vote.block_hash,
664 vote.vote_type,
665 );
666 if self
667 .verifier
668 .verify(&vi.public_key, &bytes, &vote.signature)
669 {
670 ok = true;
671 break;
672 }
673 }
674 if !ok {
675 warn!(validator = %vote.validator, "invalid vote signature");
676 return false;
677 }
678 true
679 }
680 ConsensusMessage::Prepare {
681 certificate,
682 signature,
683 } => {
684 let Some(leader) = vs.leader_for_view(certificate.view) else {
686 return false;
687 };
688 let mut prepare_ok = false;
689 for epoch in self.verification_epochs() {
690 let bytes = view_protocol::prepare_signing_bytes(
691 &self.state.chain_id_hash,
692 epoch,
693 certificate,
694 );
695 if self.verifier.verify(&leader.public_key, &bytes, signature) {
696 prepare_ok = true;
697 break;
698 }
699 }
700 if !prepare_ok {
701 warn!(view = %certificate.view, "invalid prepare signature");
702 return false;
703 }
704 let qc_bytes = Vote::signing_bytes(
706 &self.state.chain_id_hash,
707 certificate.epoch,
708 certificate.view,
709 &certificate.block_hash,
710 VoteType::Vote,
711 );
712 if !self
713 .verifier
714 .verify_aggregate(vs, &qc_bytes, &certificate.aggregate_signature)
715 {
716 warn!(view = %certificate.view, "invalid QC aggregate signature");
717 return false;
718 }
719 if !hotmint_crypto::has_quorum(vs, &certificate.aggregate_signature) {
720 warn!(view = %certificate.view, "Prepare QC below quorum threshold");
721 return false;
722 }
723 true
724 }
725 ConsensusMessage::Wish {
726 target_view,
727 validator,
728 highest_qc,
729 signature,
730 } => {
731 let Some(vi) = vs.get(*validator) else {
732 warn!(validator = %validator, "wish from unknown validator");
733 return false;
734 };
735 let mut wish_ok = false;
737 for epoch in self.verification_epochs() {
738 let bytes = crate::pacemaker::wish_signing_bytes(
739 &self.state.chain_id_hash,
740 epoch,
741 *target_view,
742 highest_qc.as_ref(),
743 );
744 if self.verifier.verify(&vi.public_key, &bytes, signature) {
745 wish_ok = true;
746 break;
747 }
748 }
749 if !wish_ok {
750 warn!(validator = %validator, "invalid wish signature");
751 return false;
752 }
753 true
754 }
755 ConsensusMessage::TimeoutCert(tc) => {
756 let target_view = ViewNumber(tc.view.as_u64() + 1);
762 let n = vs.validator_count();
763 if tc.aggregate_signature.signers.len() != n {
764 warn!(view = %tc.view, "TC signers bitfield length mismatch");
765 return false;
766 }
767 let mut sig_idx = 0usize;
768 let mut power = 0u64;
769 for (i, &signed) in tc.aggregate_signature.signers.iter().enumerate() {
770 if !signed {
771 continue;
772 }
773 let Some(vi) = vs.validators().get(i) else {
774 warn!(view = %tc.view, validator_idx = i, "TC signer index out of validator set");
775 return false;
776 };
777 let hqc = tc.highest_qcs.get(i).and_then(|h| h.as_ref());
778 if sig_idx >= tc.aggregate_signature.signatures.len() {
779 warn!(view = %tc.view, "TC aggregate_signature has fewer sigs than signers");
780 return false;
781 }
782 let mut tc_sig_ok = false;
783 for epoch in self.verification_epochs() {
784 let bytes = crate::pacemaker::wish_signing_bytes(
785 &self.state.chain_id_hash,
786 epoch,
787 target_view,
788 hqc,
789 );
790 if self.verifier.verify(
791 &vi.public_key,
792 &bytes,
793 &tc.aggregate_signature.signatures[sig_idx],
794 ) {
795 tc_sig_ok = true;
796 break;
797 }
798 }
799 if !tc_sig_ok {
800 warn!(view = %tc.view, validator = %vi.id, "TC signer signature invalid");
801 return false;
802 }
803 power += vs.power_of(vi.id);
804 sig_idx += 1;
805 }
806 if sig_idx != tc.aggregate_signature.signatures.len() {
807 warn!(view = %tc.view, "TC has extra signatures beyond bitfield");
808 return false;
809 }
810 if power < vs.quorum_threshold() {
811 warn!(view = %tc.view, power, threshold = vs.quorum_threshold(), "TC insufficient quorum");
812 return false;
813 }
814 true
815 }
816 ConsensusMessage::StatusCert {
817 locked_qc,
818 validator,
819 signature,
820 } => {
821 let Some(vi) = vs.get(*validator) else {
822 warn!(validator = %validator, "status from unknown validator");
823 return false;
824 };
825 let mut status_ok = false;
826 for epoch in self.verification_epochs() {
827 let bytes = view_protocol::status_signing_bytes(
828 &self.state.chain_id_hash,
829 epoch,
830 self.state.current_view,
831 locked_qc,
832 );
833 if self.verifier.verify(&vi.public_key, &bytes, signature) {
834 status_ok = true;
835 break;
836 }
837 }
838 if !status_ok {
839 warn!(validator = %validator, "invalid status signature");
840 return false;
841 }
842 true
843 }
844 ConsensusMessage::Evidence(_) => {
845 true
849 }
850 }
851 }
852
853 async fn handle_message(
854 &mut self,
855 _sender: Option<ValidatorId>,
856 msg: ConsensusMessage,
857 ) -> Result<()> {
858 let verified = tokio::task::block_in_place(|| self.verify_message(&msg));
863 if !verified {
864 return Ok(());
865 }
866
867 match msg {
868 ConsensusMessage::Propose {
869 block,
870 justify,
871 double_cert,
872 signature: _,
873 } => {
874 let block = *block;
875 let justify = *justify;
876 let double_cert = double_cert.map(|dc| *dc);
877
878 if block.view > self.state.current_view {
880 if let Some(ref dc) = double_cert {
881 if !tokio::task::block_in_place(|| self.validate_double_cert(dc)) {
882 return Ok(());
883 }
884
885 self.apply_commit(dc, "fast-forward").await;
887 self.state.highest_double_cert = Some(dc.clone());
888 self.advance_view_to(block.view, ViewEntryTrigger::DoubleCert(dc.clone()))
889 .await;
890 } else {
891 return Ok(());
892 }
893 } else if block.view < self.state.current_view {
894 if block.height > self.state.last_committed_height {
900 let expected = hotmint_crypto::compute_block_hash(&block);
902 if block.hash == expected {
903 let mut store = self.store.write().await;
904 store.put_block(block);
905 }
906 }
907 return Ok(());
908 }
909
910 let mut store = self.store.write().await;
911
912 if let Some(ref dc) = double_cert
917 && !tokio::task::block_in_place(|| self.validate_double_cert(dc))
918 {
919 return Ok(());
920 }
921
922 if justify.aggregate_signature.count() > 0
928 && let Some(justified_block) = store.get_block(&justify.block_hash)
929 && store.get_commit_qc(justified_block.height).is_none()
930 {
931 store.put_commit_qc(justified_block.height, justify.clone());
932 }
933
934 let maybe_pending = view_protocol::on_proposal(
935 &mut self.state,
936 view_protocol::ProposalData {
937 block,
938 justify,
939 double_cert,
940 },
941 store.as_mut(),
942 self.network.as_ref(),
943 self.app.as_ref(),
944 self.signer.as_ref(),
945 )
946 .c(d!())?;
947 drop(store);
948
949 if let Some(epoch) = maybe_pending {
950 self.pending_epoch = Some(epoch);
951 }
952 }
953
954 ConsensusMessage::VoteMsg(vote) => {
955 if vote.view != self.state.current_view {
956 return Ok(());
957 }
958 if !self.state.is_leader() {
959 return Ok(());
960 }
961 if vote.vote_type != VoteType::Vote {
962 return Ok(());
963 }
964
965 let result = self
966 .vote_collector
967 .add_vote(
968 &self.state.validator_set,
969 vote,
970 self.state.current_epoch.number,
971 )
972 .c(d!())?;
973 self.handle_equivocation(&result);
974 if let Some(qc) = result.qc {
975 self.on_qc_formed(qc).await;
976 }
977 }
978
979 ConsensusMessage::Prepare {
980 certificate,
981 signature: _,
982 } => {
983 if certificate.view < self.state.current_view {
984 return Ok(());
985 }
986 if certificate.view == self.state.current_view {
987 let store = self.store.read().await;
992 let block_opt = store.get_block(&certificate.block_hash);
993 if self.app.tracks_app_hash()
994 && let Some(ref block) = block_opt
995 && block.app_hash != self.state.last_app_hash
996 {
997 warn!(
998 block_app_hash = %block.app_hash,
999 local_app_hash = %self.state.last_app_hash,
1000 "prepare block app_hash mismatch, ignoring"
1001 );
1002 return Ok(());
1003 }
1004
1005 let vote_extension = block_opt.and_then(|block| {
1008 let ctx = BlockContext {
1009 height: block.height,
1010 view: self.state.current_view,
1011 proposer: block.proposer,
1012 epoch: self.state.current_epoch.number,
1013 epoch_start_view: self.state.current_epoch.start_view,
1014 validator_set: &self.state.validator_set,
1015 };
1016 self.app.extend_vote(&block, &ctx)
1017 });
1018 drop(store);
1019
1020 view_protocol::on_prepare(
1021 &mut self.state,
1022 certificate,
1023 self.network.as_ref(),
1024 self.signer.as_ref(),
1025 vote_extension,
1026 );
1027 }
1028 }
1029
1030 ConsensusMessage::Vote2Msg(vote) => {
1031 if vote.view != self.state.current_view {
1032 return Ok(());
1033 }
1034 if vote.vote_type != VoteType::Vote2 {
1035 return Ok(());
1036 }
1037
1038 if let Some(ref ext) = vote.extension
1040 && !self
1041 .app
1042 .verify_vote_extension(ext, &vote.block_hash, vote.validator)
1043 {
1044 warn!(
1045 validator = %vote.validator,
1046 view = %vote.view,
1047 "rejecting vote2: invalid vote extension"
1048 );
1049 return Ok(());
1050 }
1051
1052 let result = self
1053 .vote_collector
1054 .add_vote(
1055 &self.state.validator_set,
1056 vote,
1057 self.state.current_epoch.number,
1058 )
1059 .c(d!())?;
1060 self.handle_equivocation(&result);
1061 if let Some(outer_qc) = result.qc {
1062 self.on_double_cert_formed(outer_qc).await;
1063 }
1064 }
1065
1066 ConsensusMessage::Wish {
1067 target_view,
1068 validator,
1069 highest_qc,
1070 signature,
1071 } => {
1072 if let Some(ref qc) = highest_qc
1075 && qc.aggregate_signature.count() > 0
1076 {
1077 let qc_bytes = Vote::signing_bytes(
1078 &self.state.chain_id_hash,
1079 qc.epoch,
1080 qc.view,
1081 &qc.block_hash,
1082 VoteType::Vote,
1083 );
1084 if !tokio::task::block_in_place(|| {
1085 self.verifier.verify_aggregate(
1086 &self.state.validator_set,
1087 &qc_bytes,
1088 &qc.aggregate_signature,
1089 )
1090 }) {
1091 warn!(validator = %validator, "wish carries invalid highest_qc signature");
1092 return Ok(());
1093 }
1094 if !hotmint_crypto::has_quorum(
1095 &self.state.validator_set,
1096 &qc.aggregate_signature,
1097 ) {
1098 warn!(validator = %validator, "wish carries highest_qc without quorum");
1099 return Ok(());
1100 }
1101 }
1102
1103 if let Some(tc) = self.pacemaker.add_wish(
1104 &self.state.validator_set,
1105 target_view,
1106 validator,
1107 highest_qc,
1108 signature,
1109 ) {
1110 info!(
1111 validator = %self.state.validator_id,
1112 view = %tc.view,
1113 "TC formed, advancing view"
1114 );
1115 self.network
1116 .broadcast(ConsensusMessage::TimeoutCert(tc.clone()));
1117 self.advance_view(ViewEntryTrigger::TimeoutCert(tc)).await;
1118 }
1119 }
1120
1121 ConsensusMessage::TimeoutCert(tc) => {
1122 if self.pacemaker.should_relay_tc(&tc) {
1123 self.network
1124 .broadcast(ConsensusMessage::TimeoutCert(tc.clone()));
1125 }
1126 let new_view = ViewNumber(tc.view.as_u64() + 1);
1127 if new_view > self.state.current_view {
1128 self.advance_view(ViewEntryTrigger::TimeoutCert(tc)).await;
1129 }
1130 }
1131
1132 ConsensusMessage::StatusCert {
1133 locked_qc,
1134 validator,
1135 signature: _,
1136 } => {
1137 if self.state.is_leader() && self.state.step == ViewStep::WaitingForStatus {
1138 if let Some(ref qc) = locked_qc {
1139 self.state.update_highest_qc(qc);
1140 }
1141 self.status_senders.insert(validator);
1142 let status_power: u64 = self
1143 .status_senders
1144 .iter()
1145 .map(|v| self.state.validator_set.power_of(*v))
1146 .sum();
1147 let total_power =
1149 status_power + self.state.validator_set.power_of(self.state.validator_id);
1150 if total_power >= self.state.validator_set.quorum_threshold() {
1151 self.try_propose().await;
1152 }
1153 }
1154 }
1155
1156 ConsensusMessage::Evidence(proof) => {
1157 let vs = &self.state.validator_set;
1161 let vi = match vs.get(proof.validator) {
1162 Some(vi) => vi,
1163 None => {
1164 warn!(validator = %proof.validator, "evidence for unknown validator");
1165 return Ok(());
1166 }
1167 };
1168 if proof.block_hash_a == proof.block_hash_b {
1169 warn!(validator = %proof.validator, "evidence has identical block hashes");
1170 return Ok(());
1171 }
1172 let bytes_a = Vote::signing_bytes(
1175 &self.state.chain_id_hash,
1176 proof.epoch,
1177 proof.view,
1178 &proof.block_hash_a,
1179 proof.vote_type,
1180 );
1181 let bytes_b = Vote::signing_bytes(
1182 &self.state.chain_id_hash,
1183 proof.epoch,
1184 proof.view,
1185 &proof.block_hash_b,
1186 proof.vote_type,
1187 );
1188 if !self
1189 .verifier
1190 .verify(&vi.public_key, &bytes_a, &proof.signature_a)
1191 || !self
1192 .verifier
1193 .verify(&vi.public_key, &bytes_b, &proof.signature_b)
1194 {
1195 warn!(validator = %proof.validator, "evidence has invalid signatures");
1196 return Ok(());
1197 }
1198
1199 info!(
1200 validator = %proof.validator,
1201 view = %proof.view,
1202 "received valid evidence gossip"
1203 );
1204 if let Err(e) = self.app.on_evidence(&proof) {
1205 warn!(error = %e, "on_evidence callback failed for gossiped proof");
1206 }
1207 if let Some(ref mut store) = self.evidence_store {
1208 store.put_evidence(proof);
1209 }
1210 }
1211 }
1212 Ok(())
1213 }
1214
1215 fn handle_equivocation(&mut self, result: &crate::vote_collector::VoteResult) {
1216 if let Some(ref proof) = result.equivocation {
1217 warn!(
1218 validator = %proof.validator,
1219 view = %proof.view,
1220 "equivocation detected!"
1221 );
1222 if let Err(e) = self.app.on_evidence(proof) {
1223 warn!(error = %e, "on_evidence callback failed");
1224 }
1225 self.network.broadcast_evidence(proof);
1226 if let Some(ref mut store) = self.evidence_store {
1227 store.put_evidence(proof.clone());
1228 }
1229 }
1230 }
1231
1232 async fn on_qc_formed(&mut self, qc: QuorumCertificate) {
1233 self.current_view_qc = Some(qc.clone());
1235
1236 view_protocol::on_votes_collected(
1237 &mut self.state,
1238 qc.clone(),
1239 self.network.as_ref(),
1240 self.signer.as_ref(),
1241 );
1242
1243 let vote_extension = {
1246 let store = self.store.read().await;
1247 store.get_block(&qc.block_hash).and_then(|block| {
1248 let ctx = BlockContext {
1249 height: block.height,
1250 view: self.state.current_view,
1251 proposer: block.proposer,
1252 epoch: self.state.current_epoch.number,
1253 epoch_start_view: self.state.current_epoch.start_view,
1254 validator_set: &self.state.validator_set,
1255 };
1256 self.app.extend_vote(&block, &ctx)
1257 })
1258 };
1259 let vote_bytes = Vote::signing_bytes(
1260 &self.state.chain_id_hash,
1261 self.state.current_epoch.number,
1262 self.state.current_view,
1263 &qc.block_hash,
1264 VoteType::Vote2,
1265 );
1266 let signature = self.signer.sign(&vote_bytes);
1267 let vote = Vote {
1268 block_hash: qc.block_hash,
1269 view: self.state.current_view,
1270 validator: self.state.validator_id,
1271 signature,
1272 vote_type: VoteType::Vote2,
1273 extension: vote_extension,
1274 };
1275
1276 self.state.update_locked_qc(&qc);
1278
1279 let next_leader_id =
1280 leader::next_leader(&self.state.validator_set, self.state.current_view);
1281 if next_leader_id == self.state.validator_id {
1282 match self.vote_collector.add_vote(
1284 &self.state.validator_set,
1285 vote,
1286 self.state.current_epoch.number,
1287 ) {
1288 Ok(result) => {
1289 self.handle_equivocation(&result);
1290 if let Some(outer_qc) = result.qc {
1291 self.on_double_cert_formed(outer_qc).await;
1292 }
1293 }
1294 Err(e) => warn!(error = %e, "failed to add self vote2"),
1295 }
1296 } else {
1297 self.network
1298 .send_to(next_leader_id, ConsensusMessage::Vote2Msg(vote));
1299 }
1300 }
1301
1302 async fn on_double_cert_formed(&mut self, outer_qc: QuorumCertificate) {
1303 let inner_qc = match self.current_view_qc.take() {
1305 Some(qc) if qc.block_hash == outer_qc.block_hash => qc,
1306 _ => {
1307 match &self.state.locked_qc {
1309 Some(qc) if qc.block_hash == outer_qc.block_hash => qc.clone(),
1310 _ => match &self.state.highest_qc {
1311 Some(qc) if qc.block_hash == outer_qc.block_hash => qc.clone(),
1312 _ => {
1313 warn!(
1314 validator = %self.state.validator_id,
1315 "double cert formed but can't find matching inner QC"
1316 );
1317 return;
1318 }
1319 },
1320 }
1321 }
1322 };
1323
1324 let dc = DoubleCertificate { inner_qc, outer_qc };
1325
1326 info!(
1327 validator = %self.state.validator_id,
1328 view = %self.state.current_view,
1329 hash = %dc.inner_qc.block_hash,
1330 "double certificate formed, committing"
1331 );
1332
1333 self.apply_commit(&dc, "double-cert").await;
1335
1336 self.state.highest_double_cert = Some(dc.clone());
1337
1338 self.advance_view(ViewEntryTrigger::DoubleCert(dc)).await;
1340 }
1341
1342 async fn handle_timeout(&mut self) {
1343 let has_power = self.state.validator_set.power_of(self.state.validator_id) > 0;
1347 if !has_power {
1348 self.pacemaker.on_timeout();
1349 return;
1350 }
1351
1352 info!(
1353 validator = %self.state.validator_id,
1354 view = %self.state.current_view,
1355 "view timeout, sending wish"
1356 );
1357
1358 let wish = self.pacemaker.build_wish(
1359 &self.state.chain_id_hash,
1360 self.state.current_epoch.number,
1361 self.state.current_view,
1362 self.state.validator_id,
1363 self.state.highest_qc.clone(),
1364 self.signer.as_ref(),
1365 );
1366
1367 self.network.broadcast(wish.clone());
1368
1369 if let ConsensusMessage::Wish {
1371 target_view,
1372 validator,
1373 highest_qc,
1374 signature,
1375 } = wish
1376 && let Some(tc) = self.pacemaker.add_wish(
1377 &self.state.validator_set,
1378 target_view,
1379 validator,
1380 highest_qc,
1381 signature,
1382 )
1383 {
1384 self.network
1385 .broadcast(ConsensusMessage::TimeoutCert(tc.clone()));
1386 self.advance_view(ViewEntryTrigger::TimeoutCert(tc)).await;
1387 return;
1388 }
1389
1390 self.pacemaker.on_timeout();
1392 }
1393
1394 async fn apply_commit(&mut self, dc: &DoubleCertificate, context: &str) {
1397 let store = self.store.read().await;
1398 match try_commit(
1399 dc,
1400 store.as_ref(),
1401 self.app.as_ref(),
1402 &mut self.state.last_committed_height,
1403 &self.state.current_epoch,
1404 ) {
1405 Ok(result) => {
1406 if !result.committed_blocks.is_empty() {
1407 self.state.last_app_hash = result.last_app_hash;
1408 }
1409 if result.pending_epoch.is_some() {
1410 self.pending_epoch = result.pending_epoch;
1411 }
1412 drop(store);
1413 {
1414 let mut s = self.store.write().await;
1415 for block in &result.committed_blocks {
1416 if result.commit_qc.block_hash == block.hash {
1422 s.put_commit_qc(block.height, result.commit_qc.clone());
1423 }
1424 }
1425 s.flush();
1426 }
1427 if let Some(ref mut ev_store) = self.evidence_store {
1429 for block in &result.committed_blocks {
1430 for proof in ev_store.get_pending() {
1431 if proof.view <= block.view {
1432 ev_store.mark_committed(proof.view, proof.validator);
1433 }
1434 }
1435 }
1436 }
1437 self.persist_state();
1441 }
1442 Err(e) => {
1443 warn!(error = %e, "try_commit failed during {context}");
1444 drop(store);
1445 }
1446 }
1447 }
1448
1449 fn validate_double_cert(&self, dc: &DoubleCertificate) -> bool {
1461 if dc.inner_qc.block_hash != dc.outer_qc.block_hash {
1462 warn!("double cert inner/outer block_hash mismatch");
1463 return false;
1464 }
1465 let vs = &self.state.validator_set;
1466 let inner_bytes = Vote::signing_bytes(
1467 &self.state.chain_id_hash,
1468 dc.inner_qc.epoch,
1469 dc.inner_qc.view,
1470 &dc.inner_qc.block_hash,
1471 VoteType::Vote,
1472 );
1473 if !self
1474 .verifier
1475 .verify_aggregate(vs, &inner_bytes, &dc.inner_qc.aggregate_signature)
1476 {
1477 warn!("double cert inner QC signature invalid");
1478 return false;
1479 }
1480 if !hotmint_crypto::has_quorum(vs, &dc.inner_qc.aggregate_signature) {
1481 warn!("double cert inner QC below quorum threshold");
1482 return false;
1483 }
1484 let outer_bytes = Vote::signing_bytes(
1485 &self.state.chain_id_hash,
1486 dc.outer_qc.epoch,
1487 dc.outer_qc.view,
1488 &dc.outer_qc.block_hash,
1489 VoteType::Vote2,
1490 );
1491 if !self
1492 .verifier
1493 .verify_aggregate(vs, &outer_bytes, &dc.outer_qc.aggregate_signature)
1494 {
1495 warn!("double cert outer QC signature invalid");
1496 return false;
1497 }
1498 if !hotmint_crypto::has_quorum(vs, &dc.outer_qc.aggregate_signature) {
1499 warn!("double cert outer QC below quorum threshold");
1500 return false;
1501 }
1502 true
1503 }
1504
1505 fn persist_state(&mut self) {
1506 if let Some(p) = self.persistence.as_mut() {
1507 p.save_current_view(self.state.current_view);
1508 if let Some(ref qc) = self.state.locked_qc {
1509 p.save_locked_qc(qc);
1510 }
1511 if let Some(ref qc) = self.state.highest_qc {
1512 p.save_highest_qc(qc);
1513 }
1514 p.save_last_committed_height(self.state.last_committed_height);
1515 p.save_current_epoch(&self.state.current_epoch);
1516 p.save_last_app_hash(self.state.last_app_hash);
1517 p.flush();
1518 }
1519 }
1520
1521 async fn advance_view(&mut self, trigger: ViewEntryTrigger) {
1522 let new_view = match &trigger {
1523 ViewEntryTrigger::DoubleCert(_) => self.state.current_view.next(),
1524 ViewEntryTrigger::TimeoutCert(tc) => ViewNumber(tc.view.as_u64() + 1),
1525 ViewEntryTrigger::Genesis => ViewNumber(1),
1526 };
1527 self.advance_view_to(new_view, trigger).await;
1528 }
1529
1530 async fn advance_view_to(&mut self, new_view: ViewNumber, trigger: ViewEntryTrigger) {
1531 if new_view <= self.state.current_view {
1532 return;
1533 }
1534
1535 let is_progress = matches!(&trigger, ViewEntryTrigger::DoubleCert(_));
1537
1538 self.vote_collector.clear_view(self.state.current_view);
1539 self.vote_collector.prune_before(self.state.current_view);
1540 self.pacemaker.clear_view(self.state.current_view);
1541 self.pacemaker.prune_before(self.state.current_view);
1542 self.status_senders.clear();
1543 self.current_view_qc = None;
1544
1545 if self
1549 .pending_epoch
1550 .as_ref()
1551 .is_some_and(|e| new_view >= e.start_view)
1552 {
1553 let Some(new_epoch) = self.pending_epoch.take() else {
1555 unreachable!("pending_epoch was Some in the condition check");
1556 };
1557 info!(
1558 validator = %self.state.validator_id,
1559 old_epoch = %self.state.current_epoch.number,
1560 new_epoch = %new_epoch.number,
1561 start_view = %new_epoch.start_view,
1562 validators = new_epoch.validator_set.validator_count(),
1563 "epoch transition"
1564 );
1565 self.state.validator_set = new_epoch.validator_set.clone();
1566 self.state.current_epoch = new_epoch;
1567 self.network
1569 .on_epoch_change(self.state.current_epoch.number, &self.state.validator_set);
1570 self.vote_collector = VoteCollector::new();
1572 self.pacemaker = Pacemaker::with_config(self.pacemaker_config.clone());
1573 }
1574
1575 view_protocol::enter_view(
1576 &mut self.state,
1577 new_view,
1578 trigger,
1579 self.network.as_ref(),
1580 self.signer.as_ref(),
1581 );
1582
1583 if is_progress {
1584 self.pacemaker.reset_on_progress();
1585 } else {
1586 self.pacemaker.reset_timer();
1587 }
1588
1589 self.persist_state();
1590
1591 if self.state.is_leader() && self.state.step == ViewStep::WaitingForStatus {
1597 self.try_propose().await;
1598 }
1599 }
1600}
1601
1602#[cfg(test)]
1607mod tests {
1608 use super::*;
1609
1610 use std::sync::Arc;
1611
1612 use tokio::sync::RwLock;
1613
1614 use hotmint_crypto::{Ed25519Signer, Ed25519Verifier};
1615 use hotmint_types::Signer as SignerTrait;
1616 use hotmint_types::certificate::QuorumCertificate;
1617 use hotmint_types::crypto::AggregateSignature;
1618 use hotmint_types::epoch::EpochNumber;
1619 use hotmint_types::validator::{ValidatorId, ValidatorInfo};
1620 use hotmint_types::vote::{Vote, VoteType};
1621 use tokio::sync::mpsc;
1622
1623 use crate::application::NoopApplication;
1624 use crate::network::NetworkSink;
1625 use crate::state::ConsensusState;
1626 use crate::store::MemoryBlockStore;
1627
1628 struct DevNullNetwork;
1630 impl NetworkSink for DevNullNetwork {
1631 fn broadcast(&self, _: ConsensusMessage) {}
1632 fn send_to(&self, _: ValidatorId, _: ConsensusMessage) {}
1633 }
1634
1635 fn test_chain_id_hash() -> [u8; 32] {
1637 *blake3::hash(b"").as_bytes()
1638 }
1639
1640 fn make_validator_set_4() -> (ValidatorSet, Vec<Ed25519Signer>) {
1641 let signers: Vec<Ed25519Signer> = (0..4)
1642 .map(|i| Ed25519Signer::generate(ValidatorId(i)))
1643 .collect();
1644 let infos: Vec<ValidatorInfo> = signers
1645 .iter()
1646 .map(|s| ValidatorInfo {
1647 id: s.validator_id(),
1648 public_key: s.public_key(),
1649 power: 1,
1650 })
1651 .collect();
1652 (ValidatorSet::new(infos), signers)
1653 }
1654
1655 fn make_test_engine(
1656 vid: ValidatorId,
1657 vs: ValidatorSet,
1658 signer: Ed25519Signer,
1659 ) -> (
1660 ConsensusEngine,
1661 mpsc::Sender<(Option<ValidatorId>, ConsensusMessage)>,
1662 ) {
1663 let (tx, rx) = mpsc::channel(64);
1664 let store = Arc::new(RwLock::new(
1665 Box::new(MemoryBlockStore::new()) as Box<dyn crate::store::BlockStore>
1666 ));
1667 let state = ConsensusState::new(vid, vs);
1668 let engine = ConsensusEngine::new(
1669 state,
1670 store,
1671 Box::new(DevNullNetwork),
1672 Box::new(NoopApplication),
1673 Box::new(signer),
1674 rx,
1675 EngineConfig {
1676 verifier: Box::new(Ed25519Verifier),
1677 pacemaker: None,
1678 persistence: None,
1679 evidence_store: None,
1680 },
1681 );
1682 (engine, tx)
1683 }
1684
1685 #[test]
1688 fn r29_propose_sub_quorum_justify_rejected_by_verify_message() {
1689 let (vs, signers) = make_validator_set_4();
1690 let engine_signer = Ed25519Signer::generate(ValidatorId(0));
1693 let (engine, _tx) = make_test_engine(ValidatorId(0), vs.clone(), engine_signer);
1694
1695 let chain_id_hash = test_chain_id_hash();
1697 let hash = BlockHash::GENESIS;
1698 let qc_view = ViewNumber::GENESIS;
1699 let vote_bytes = Vote::signing_bytes(
1700 &chain_id_hash,
1701 EpochNumber(0),
1702 qc_view,
1703 &hash,
1704 VoteType::Vote,
1705 );
1706 let mut agg = AggregateSignature::new(4);
1707 agg.add(1, SignerTrait::sign(&signers[1], &vote_bytes))
1708 .unwrap();
1709 let sub_quorum_qc = QuorumCertificate {
1710 block_hash: hash,
1711 view: qc_view,
1712 aggregate_signature: agg,
1713 epoch: EpochNumber(0),
1714 };
1715
1716 let mut block = Block::genesis();
1718 block.height = Height(1);
1719 block.view = ViewNumber(1);
1720 block.proposer = ValidatorId(1);
1721 block.hash = block.compute_hash();
1722 let proposal_bytes = crate::view_protocol::proposal_signing_bytes(
1723 &chain_id_hash,
1724 EpochNumber(0),
1725 &block,
1726 &sub_quorum_qc,
1727 );
1728 let signature = SignerTrait::sign(&signers[1], &proposal_bytes);
1729
1730 let msg = ConsensusMessage::Propose {
1731 block: Box::new(block),
1732 justify: Box::new(sub_quorum_qc),
1733 double_cert: None,
1734 signature,
1735 };
1736
1737 assert!(
1738 !engine.verify_message(&msg),
1739 "R-29 regression: Propose with sub-quorum justify QC must be rejected by verify_message"
1740 );
1741 }
1742
1743 #[test]
1745 fn r29_propose_full_quorum_justify_accepted_by_verify_message() {
1746 let (vs, signers) = make_validator_set_4();
1747 let engine_signer = Ed25519Signer::generate(ValidatorId(0));
1748 let (engine, _tx) = make_test_engine(ValidatorId(0), vs.clone(), engine_signer);
1749
1750 let chain_id_hash = test_chain_id_hash();
1751 let hash = BlockHash::GENESIS;
1752 let qc_view = ViewNumber::GENESIS;
1753 let vote_bytes = Vote::signing_bytes(
1754 &chain_id_hash,
1755 EpochNumber(0),
1756 qc_view,
1757 &hash,
1758 VoteType::Vote,
1759 );
1760 let mut agg = AggregateSignature::new(4);
1762 for (i, signer) in signers.iter().take(3).enumerate() {
1763 agg.add(i, SignerTrait::sign(signer, &vote_bytes)).unwrap();
1764 }
1765 let full_quorum_qc = QuorumCertificate {
1766 block_hash: hash,
1767 view: qc_view,
1768 aggregate_signature: agg,
1769 epoch: EpochNumber(0),
1770 };
1771
1772 let mut block = Block::genesis();
1773 block.height = Height(1);
1774 block.view = ViewNumber(1);
1775 block.proposer = ValidatorId(1);
1776 block.hash = block.compute_hash();
1777 let proposal_bytes = crate::view_protocol::proposal_signing_bytes(
1778 &chain_id_hash,
1779 EpochNumber(0),
1780 &block,
1781 &full_quorum_qc,
1782 );
1783 let signature = SignerTrait::sign(&signers[1], &proposal_bytes);
1784
1785 let msg = ConsensusMessage::Propose {
1786 block: Box::new(block),
1787 justify: Box::new(full_quorum_qc),
1788 double_cert: None,
1789 signature,
1790 };
1791
1792 assert!(
1793 engine.verify_message(&msg),
1794 "R-29: Propose with full quorum justify QC must pass verify_message"
1795 );
1796 }
1797
1798 #[test]
1806 fn r32_sub_quorum_highest_qc_fails_has_quorum() {
1807 let (vs, signers) = make_validator_set_4();
1808
1809 let chain_id_hash = test_chain_id_hash();
1810 let hash = BlockHash([1u8; 32]);
1811 let qc_view = ViewNumber(1);
1812 let vote_bytes = Vote::signing_bytes(
1813 &chain_id_hash,
1814 EpochNumber(0),
1815 qc_view,
1816 &hash,
1817 VoteType::Vote,
1818 );
1819
1820 let mut agg = AggregateSignature::new(4);
1822 agg.add(0, SignerTrait::sign(&signers[0], &vote_bytes))
1823 .unwrap();
1824
1825 assert!(
1826 !hotmint_crypto::has_quorum(&vs, &agg),
1827 "R-32 regression: 1-of-4 signed QC must not satisfy has_quorum"
1828 );
1829
1830 let mut agg_full = AggregateSignature::new(4);
1832 for (i, signer) in signers.iter().take(3).enumerate() {
1833 agg_full
1834 .add(i, SignerTrait::sign(signer, &vote_bytes))
1835 .unwrap();
1836 }
1837 assert!(
1838 hotmint_crypto::has_quorum(&vs, &agg_full),
1839 "R-32: 3-of-4 signed QC must satisfy has_quorum"
1840 );
1841 }
1842}