1use ruc::*;
2
3use std::collections::{HashMap, HashSet};
4use std::sync::Arc;
5
6use crate::application::Application;
7use crate::commit::{CommitResult, try_commit};
8use crate::evidence_store::EvidenceStore;
9use crate::leader;
10use crate::liveness::{LivenessTracker, OfflineEvidence};
11use crate::network::NetworkSink;
12use crate::pacemaker::{Pacemaker, PacemakerConfig};
13use crate::state::{ConsensusState, ViewStep};
14use crate::store::BlockStore;
15use crate::view_protocol::{self, ViewEntryTrigger};
16use crate::vote_collector::VoteCollector;
17
18use hotmint_types::epoch::Epoch;
19use hotmint_types::vote::VoteType;
20use hotmint_types::*;
21use tokio::sync::mpsc;
22use tracing::{info, warn};
23
24pub type SharedBlockStore = Arc<parking_lot::RwLock<Box<dyn BlockStore>>>;
26
27pub trait StatePersistence: Send {
29 fn save_current_view(&mut self, view: ViewNumber);
30 fn save_locked_qc(&mut self, qc: &QuorumCertificate);
31 fn save_highest_qc(&mut self, qc: &QuorumCertificate);
32 fn save_last_committed_height(&mut self, height: Height);
33 fn save_current_epoch(&mut self, epoch: &Epoch);
34 fn save_last_app_hash(&mut self, hash: BlockHash);
35 fn flush(&self);
36}
37
38pub trait Wal: Send {
40 fn log_commit_intent(&mut self, target_height: Height) -> std::io::Result<()>;
42 fn log_commit_done(&mut self, target_height: Height) -> std::io::Result<()>;
44}
45
46pub struct ConsensusEngine {
47 state: ConsensusState,
48 store: SharedBlockStore,
49 network: Box<dyn NetworkSink>,
50 app: Box<dyn Application>,
51 signer: Box<dyn Signer>,
52 verifier: Box<dyn Verifier>,
53 vote_collector: VoteCollector,
54 pacemaker: Pacemaker,
55 pacemaker_config: PacemakerConfig,
56 msg_rx: mpsc::Receiver<(Option<ValidatorId>, ConsensusMessage)>,
57 status_senders: HashSet<ValidatorId>,
59 current_view_qc: Option<QuorumCertificate>,
61 pending_epoch: Option<Epoch>,
63 persistence: Option<Box<dyn StatePersistence>>,
65 evidence_store: Option<Box<dyn EvidenceStore>>,
67 liveness_tracker: LivenessTracker,
69 wal: Option<Box<dyn Wal>>,
71}
72
73pub struct EngineConfig {
75 pub verifier: Box<dyn Verifier>,
76 pub pacemaker: Option<PacemakerConfig>,
77 pub persistence: Option<Box<dyn StatePersistence>>,
78 pub evidence_store: Option<Box<dyn EvidenceStore>>,
79 pub wal: Option<Box<dyn Wal>>,
80}
81
82impl EngineConfig {
83 pub fn new(verifier: Box<dyn Verifier>) -> Self {
86 Self {
87 verifier,
88 pacemaker: None,
89 persistence: None,
90 evidence_store: None,
91 wal: None,
92 }
93 }
94
95 pub fn with_pacemaker(mut self, pacemaker: PacemakerConfig) -> Self {
97 self.pacemaker = Some(pacemaker);
98 self
99 }
100
101 pub fn with_persistence(mut self, persistence: Box<dyn StatePersistence>) -> Self {
103 self.persistence = Some(persistence);
104 self
105 }
106}
107
108pub struct ConsensusEngineBuilder {
124 state: Option<ConsensusState>,
125 store: Option<SharedBlockStore>,
126 network: Option<Box<dyn NetworkSink>>,
127 app: Option<Box<dyn Application>>,
128 signer: Option<Box<dyn Signer>>,
129 msg_rx: Option<mpsc::Receiver<(Option<ValidatorId>, ConsensusMessage)>>,
130 verifier: Option<Box<dyn Verifier>>,
131 pacemaker: Option<PacemakerConfig>,
132 persistence: Option<Box<dyn StatePersistence>>,
133 evidence_store: Option<Box<dyn EvidenceStore>>,
134 wal: Option<Box<dyn Wal>>,
135}
136
137impl ConsensusEngineBuilder {
138 pub fn new() -> Self {
139 Self {
140 state: None,
141 store: None,
142 network: None,
143 app: None,
144 signer: None,
145 msg_rx: None,
146 verifier: None,
147 pacemaker: None,
148 persistence: None,
149 evidence_store: None,
150 wal: None,
151 }
152 }
153
154 pub fn state(mut self, state: ConsensusState) -> Self {
155 self.state = Some(state);
156 self
157 }
158
159 pub fn store(mut self, store: SharedBlockStore) -> Self {
160 self.store = Some(store);
161 self
162 }
163
164 pub fn network(mut self, network: Box<dyn NetworkSink>) -> Self {
165 self.network = Some(network);
166 self
167 }
168
169 pub fn app(mut self, app: Box<dyn Application>) -> Self {
170 self.app = Some(app);
171 self
172 }
173
174 pub fn signer(mut self, signer: Box<dyn Signer>) -> Self {
175 self.signer = Some(signer);
176 self
177 }
178
179 pub fn messages(
180 mut self,
181 msg_rx: mpsc::Receiver<(Option<ValidatorId>, ConsensusMessage)>,
182 ) -> Self {
183 self.msg_rx = Some(msg_rx);
184 self
185 }
186
187 pub fn verifier(mut self, verifier: Box<dyn Verifier>) -> Self {
188 self.verifier = Some(verifier);
189 self
190 }
191
192 pub fn pacemaker(mut self, config: PacemakerConfig) -> Self {
193 self.pacemaker = Some(config);
194 self
195 }
196
197 pub fn persistence(mut self, persistence: Box<dyn StatePersistence>) -> Self {
198 self.persistence = Some(persistence);
199 self
200 }
201
202 pub fn evidence_store(mut self, store: Box<dyn EvidenceStore>) -> Self {
203 self.evidence_store = Some(store);
204 self
205 }
206
207 pub fn wal(mut self, wal: Box<dyn Wal>) -> Self {
208 self.wal = Some(wal);
209 self
210 }
211
212 pub fn build(self) -> ruc::Result<ConsensusEngine> {
213 let state = self.state.ok_or_else(|| ruc::eg!("state is required"))?;
214 let store = self.store.ok_or_else(|| ruc::eg!("store is required"))?;
215 let network = self
216 .network
217 .ok_or_else(|| ruc::eg!("network is required"))?;
218 let app = self.app.ok_or_else(|| ruc::eg!("app is required"))?;
219 let signer = self.signer.ok_or_else(|| ruc::eg!("signer is required"))?;
220 let msg_rx = self
221 .msg_rx
222 .ok_or_else(|| ruc::eg!("messages (msg_rx) is required"))?;
223 let verifier = self
224 .verifier
225 .ok_or_else(|| ruc::eg!("verifier is required"))?;
226
227 let config = EngineConfig {
228 verifier,
229 pacemaker: self.pacemaker,
230 persistence: self.persistence,
231 evidence_store: self.evidence_store,
232 wal: self.wal,
233 };
234
235 Ok(ConsensusEngine::new(
236 state, store, network, app, signer, msg_rx, config,
237 ))
238 }
239}
240
241impl Default for ConsensusEngineBuilder {
242 fn default() -> Self {
243 Self::new()
244 }
245}
246
247impl ConsensusEngine {
248 pub fn new(
249 state: ConsensusState,
250 store: SharedBlockStore,
251 network: Box<dyn NetworkSink>,
252 app: Box<dyn Application>,
253 signer: Box<dyn Signer>,
254 msg_rx: mpsc::Receiver<(Option<ValidatorId>, ConsensusMessage)>,
255 config: EngineConfig,
256 ) -> Self {
257 let pc = config.pacemaker.unwrap_or_default();
258 Self {
259 state,
260 store,
261 network,
262 app,
263 signer,
264 verifier: config.verifier,
265 vote_collector: VoteCollector::new(),
266 pacemaker: Pacemaker::with_config(pc.clone()),
267 pacemaker_config: pc,
268 msg_rx,
269 status_senders: HashSet::new(),
270 current_view_qc: None,
271 pending_epoch: None,
272 persistence: config.persistence,
273 evidence_store: config.evidence_store,
274 liveness_tracker: LivenessTracker::new(),
275 wal: config.wal,
276 }
277 }
278
279 pub async fn run(mut self) {
282 let app_info = self.app.info();
284 if app_info.last_block_height.as_u64() > 0
285 && app_info.last_block_height != self.state.last_committed_height
286 {
287 warn!(
288 app_height = app_info.last_block_height.as_u64(),
289 consensus_height = self.state.last_committed_height.as_u64(),
290 "application height differs from consensus state — possible state divergence"
291 );
292 }
293
294 if self.state.current_view.as_u64() <= 1 {
295 self.enter_genesis_view().await;
296 } else {
297 info!(
298 validator = %self.state.validator_id,
299 view = %self.state.current_view,
300 height = %self.state.last_committed_height,
301 "resuming from persisted state"
302 );
303 self.pacemaker.reset_timer();
304 }
305
306 loop {
307 let deadline = self.pacemaker.sleep_until_deadline();
308 tokio::pin!(deadline);
309
310 tokio::select! {
311 Some((sender, msg)) = self.msg_rx.recv() => {
312 if let Err(e) = self.handle_message(sender, msg).await {
313 warn!(validator = %self.state.validator_id, error = %e, "error handling message");
314 }
315 }
316 _ = &mut deadline => {
317 self.handle_timeout().await;
318 }
319 }
320 }
321 }
322
323 async fn enter_genesis_view(&mut self) {
324 let genesis_qc = QuorumCertificate {
326 block_hash: BlockHash::GENESIS,
327 view: ViewNumber::GENESIS,
328 aggregate_signature: AggregateSignature::new(
329 self.state.validator_set.validator_count(),
330 ),
331 epoch: self.state.current_epoch.number,
332 };
333 self.state.highest_qc = Some(genesis_qc);
334
335 let view = ViewNumber(1);
336 view_protocol::enter_view(
337 &mut self.state,
338 view,
339 ViewEntryTrigger::Genesis,
340 self.network.as_ref(),
341 self.signer.as_ref(),
342 );
343 self.pacemaker.reset_timer();
344
345 if self.state.is_leader() {
347 self.state.step = ViewStep::WaitingForStatus;
348 self.try_propose().await;
350 }
351 }
352
353 fn try_propose(
354 &mut self,
355 ) -> std::pin::Pin<Box<dyn std::future::Future<Output = ()> + Send + '_>> {
356 Box::pin(async move {
357 let pending_evidence = self
359 .evidence_store
360 .as_ref()
361 .map(|s| s.get_pending())
362 .unwrap_or_default();
363
364 let proposed_block = {
366 let mut store = self.store.write();
367 view_protocol::propose(
368 &mut self.state,
369 store.as_mut(),
370 self.network.as_ref(),
371 self.app.as_ref(),
372 self.signer.as_ref(),
373 pending_evidence,
374 )
375 }; match proposed_block {
378 Ok(block) => {
379 self.leader_self_vote(block.hash).await;
381 }
382 Err(e) => {
383 warn!(
384 validator = %self.state.validator_id,
385 error = %e,
386 "failed to propose"
387 );
388 }
389 }
390 })
391 }
392
393 async fn leader_self_vote(&mut self, block_hash: BlockHash) {
394 let vote_bytes = Vote::signing_bytes(
395 &self.state.chain_id_hash,
396 self.state.current_epoch.number,
397 self.state.current_view,
398 &block_hash,
399 VoteType::Vote,
400 );
401 let signature = self.signer.sign(&vote_bytes);
402 let vote = Vote {
403 block_hash,
404 view: self.state.current_view,
405 validator: self.state.validator_id,
406 signature,
407 vote_type: VoteType::Vote,
408 extension: None,
409 };
410 match self.vote_collector.add_vote(
411 &self.state.validator_set,
412 vote,
413 self.state.current_epoch.number,
414 ) {
415 Ok(result) => {
416 self.handle_equivocation(&result);
417 if let Some(qc) = result.qc {
418 self.on_qc_formed(qc).await;
419 }
420 }
421 Err(e) => warn!(error = %e, "failed to add self vote"),
422 }
423 }
424}
425
426pub fn verify_relay_sender(
443 sender: ValidatorId,
444 msg: &ConsensusMessage,
445 validator_keys: &HashMap<ValidatorId, hotmint_types::crypto::PublicKey>,
446 ordered_validators: &[ValidatorId],
447 chain_id_hash: &[u8; 32],
448 epoch: hotmint_types::epoch::EpochNumber,
449) -> bool {
450 use hotmint_crypto::Ed25519Verifier;
451 use hotmint_types::Verifier;
452 use hotmint_types::vote::Vote;
453 let verifier = Ed25519Verifier;
454 match msg {
455 ConsensusMessage::Propose {
456 block,
457 justify,
458 signature,
459 ..
460 } => {
461 let Some(pk) = validator_keys.get(&block.proposer) else {
462 return false;
463 };
464 let bytes =
465 crate::view_protocol::proposal_signing_bytes(chain_id_hash, epoch, block, justify);
466 Verifier::verify(&verifier, pk, &bytes, signature)
467 }
468 ConsensusMessage::VoteMsg(vote) | ConsensusMessage::Vote2Msg(vote) => {
469 let Some(pk) = validator_keys.get(&vote.validator) else {
470 return false;
471 };
472 let bytes = Vote::signing_bytes(
473 chain_id_hash,
474 epoch,
475 vote.view,
476 &vote.block_hash,
477 vote.vote_type,
478 );
479 Verifier::verify(&verifier, pk, &bytes, &vote.signature)
480 }
481 ConsensusMessage::Prepare {
482 certificate,
483 signature,
484 } => {
485 if !ordered_validators.is_empty() {
488 let n = ordered_validators.len();
489 let expected_leader = ordered_validators[certificate.view.as_u64() as usize % n];
490 if sender != expected_leader {
491 return false;
492 }
493 }
494 let Some(pk) = validator_keys.get(&sender) else {
495 return false;
496 };
497 let bytes =
498 crate::view_protocol::prepare_signing_bytes(chain_id_hash, epoch, certificate);
499 Verifier::verify(&verifier, pk, &bytes, signature)
500 }
501 ConsensusMessage::Wish {
502 target_view,
503 validator,
504 highest_qc,
505 signature,
506 } => {
507 let Some(pk) = validator_keys.get(validator) else {
508 return false;
509 };
510 let bytes = crate::pacemaker::wish_signing_bytes(
511 chain_id_hash,
512 epoch,
513 *target_view,
514 highest_qc.as_ref(),
515 );
516 Verifier::verify(&verifier, pk, &bytes, signature)
517 }
518 ConsensusMessage::TimeoutCert(tc) => {
519 let target_view = ViewNumber(tc.view.as_u64() + 1);
521 let n = ordered_validators.len();
522 if n == 0 || tc.aggregate_signature.signers.len() != n {
523 return false;
524 }
525 let mut sig_idx = 0usize;
526 let mut verified_count = 0usize;
527 for (i, &signed) in tc.aggregate_signature.signers.iter().enumerate() {
528 if !signed {
529 continue;
530 }
531 if i >= n {
532 return false;
533 }
534 let vid = ordered_validators[i];
535 let Some(pk) = validator_keys.get(&vid) else {
536 return false;
537 };
538 let hqc = tc.highest_qcs.get(i).and_then(|h| h.as_ref());
539 let bytes =
540 crate::pacemaker::wish_signing_bytes(chain_id_hash, epoch, target_view, hqc);
541 if sig_idx >= tc.aggregate_signature.signatures.len() {
542 return false;
543 }
544 if !Verifier::verify(
545 &verifier,
546 pk,
547 &bytes,
548 &tc.aggregate_signature.signatures[sig_idx],
549 ) {
550 return false;
551 }
552 sig_idx += 1;
553 verified_count += 1;
554 }
555 if sig_idx != tc.aggregate_signature.signatures.len() {
556 return false;
557 }
558 verified_count * 3 > n * 2
561 }
562 ConsensusMessage::StatusCert {
563 validator,
564 signature,
565 locked_qc,
566 ..
567 } => {
568 let Some(pk) = validator_keys.get(validator) else {
574 return false;
575 };
576 let _ = (pk, signature, locked_qc);
580 true
581 }
582 ConsensusMessage::Evidence(_) => {
583 validator_keys.contains_key(&sender)
586 }
587 }
588}
589
590impl ConsensusEngine {
591 fn verification_epochs(&self) -> [EpochNumber; 2] {
595 let cur = self.state.current_epoch.number;
596 let prev = if cur.as_u64() > 0 {
597 EpochNumber(cur.as_u64() - 1)
598 } else {
599 cur
600 };
601 [cur, prev]
602 }
603
604 fn verify_message(&self, msg: &ConsensusMessage) -> bool {
612 let msg_view = match msg {
617 ConsensusMessage::Propose { .. } => None, ConsensusMessage::VoteMsg(v) | ConsensusMessage::Vote2Msg(v) => Some(v.view),
619 ConsensusMessage::Prepare { certificate, .. } => Some(certificate.view),
620 ConsensusMessage::Wish { target_view, .. } => Some(*target_view),
621 ConsensusMessage::TimeoutCert(tc) => Some(ViewNumber(tc.view.as_u64() + 1)),
622 ConsensusMessage::StatusCert { .. } => None,
623 ConsensusMessage::Evidence(_) => None, };
625 if let Some(v) = msg_view
626 && v < self.state.current_view
627 {
628 return true; }
630
631 let vs = &self.state.validator_set;
632 match msg {
633 ConsensusMessage::Propose {
634 block,
635 justify,
636 signature,
637 ..
638 } => {
639 let proposer = vs.get(block.proposer);
640 let Some(vi) = proposer else {
641 warn!(proposer = %block.proposer, "propose from unknown validator");
642 return false;
643 };
644 let mut proposal_ok = false;
645 for epoch in self.verification_epochs() {
646 let bytes = view_protocol::proposal_signing_bytes(
647 &self.state.chain_id_hash,
648 epoch,
649 block,
650 justify,
651 );
652 if self.verifier.verify(&vi.public_key, &bytes, signature) {
653 proposal_ok = true;
654 break;
655 }
656 }
657 if !proposal_ok {
658 warn!(proposer = %block.proposer, "invalid proposal signature");
659 return false;
660 }
661 if justify.aggregate_signature.count() > 0 {
663 let qc_bytes = Vote::signing_bytes(
664 &self.state.chain_id_hash,
665 justify.epoch,
666 justify.view,
667 &justify.block_hash,
668 VoteType::Vote,
669 );
670 if !self
671 .verifier
672 .verify_aggregate(vs, &qc_bytes, &justify.aggregate_signature)
673 {
674 warn!(proposer = %block.proposer, "invalid justify QC aggregate signature");
675 return false;
676 }
677 if justify.epoch == self.state.current_epoch.number
678 && !hotmint_crypto::has_quorum(vs, &justify.aggregate_signature)
679 {
680 warn!(proposer = %block.proposer, "justify QC below quorum threshold");
681 return false;
682 }
683 }
684 true
685 }
686 ConsensusMessage::VoteMsg(vote) | ConsensusMessage::Vote2Msg(vote) => {
687 let Some(vi) = vs.get(vote.validator) else {
688 warn!(validator = %vote.validator, "vote from unknown validator");
689 return false;
690 };
691 let mut ok = false;
692 for epoch in self.verification_epochs() {
693 let bytes = Vote::signing_bytes(
694 &self.state.chain_id_hash,
695 epoch,
696 vote.view,
697 &vote.block_hash,
698 vote.vote_type,
699 );
700 if self
701 .verifier
702 .verify(&vi.public_key, &bytes, &vote.signature)
703 {
704 ok = true;
705 break;
706 }
707 }
708 if !ok {
709 warn!(validator = %vote.validator, "invalid vote signature");
710 return false;
711 }
712 true
713 }
714 ConsensusMessage::Prepare {
715 certificate,
716 signature,
717 } => {
718 let Some(leader) = vs.leader_for_view(certificate.view) else {
720 return false;
721 };
722 let mut prepare_ok = false;
723 for epoch in self.verification_epochs() {
724 let bytes = view_protocol::prepare_signing_bytes(
725 &self.state.chain_id_hash,
726 epoch,
727 certificate,
728 );
729 if self.verifier.verify(&leader.public_key, &bytes, signature) {
730 prepare_ok = true;
731 break;
732 }
733 }
734 if !prepare_ok {
735 warn!(view = %certificate.view, "invalid prepare signature");
736 return false;
737 }
738 let qc_bytes = Vote::signing_bytes(
740 &self.state.chain_id_hash,
741 certificate.epoch,
742 certificate.view,
743 &certificate.block_hash,
744 VoteType::Vote,
745 );
746 if !self
747 .verifier
748 .verify_aggregate(vs, &qc_bytes, &certificate.aggregate_signature)
749 {
750 warn!(view = %certificate.view, "invalid QC aggregate signature");
751 return false;
752 }
753 if certificate.epoch == self.state.current_epoch.number
754 && !hotmint_crypto::has_quorum(vs, &certificate.aggregate_signature)
755 {
756 warn!(view = %certificate.view, "Prepare QC below quorum threshold");
757 return false;
758 }
759 true
760 }
761 ConsensusMessage::Wish {
762 target_view,
763 validator,
764 highest_qc,
765 signature,
766 } => {
767 let Some(vi) = vs.get(*validator) else {
768 warn!(validator = %validator, "wish from unknown validator");
769 return false;
770 };
771 let mut wish_ok = false;
773 for epoch in self.verification_epochs() {
774 let bytes = crate::pacemaker::wish_signing_bytes(
775 &self.state.chain_id_hash,
776 epoch,
777 *target_view,
778 highest_qc.as_ref(),
779 );
780 if self.verifier.verify(&vi.public_key, &bytes, signature) {
781 wish_ok = true;
782 break;
783 }
784 }
785 if !wish_ok {
786 warn!(validator = %validator, "invalid wish signature");
787 return false;
788 }
789 true
790 }
791 ConsensusMessage::TimeoutCert(tc) => {
792 let target_view = ViewNumber(tc.view.as_u64() + 1);
798 let n = vs.validator_count();
799 if tc.aggregate_signature.signers.len() != n {
800 warn!(view = %tc.view, "TC signers bitfield length mismatch");
801 return false;
802 }
803 let mut sig_idx = 0usize;
804 let mut power = 0u64;
805 for (i, &signed) in tc.aggregate_signature.signers.iter().enumerate() {
806 if !signed {
807 continue;
808 }
809 let Some(vi) = vs.validators().get(i) else {
810 warn!(view = %tc.view, validator_idx = i, "TC signer index out of validator set");
811 return false;
812 };
813 let hqc = tc.highest_qcs.get(i).and_then(|h| h.as_ref());
814 if sig_idx >= tc.aggregate_signature.signatures.len() {
815 warn!(view = %tc.view, "TC aggregate_signature has fewer sigs than signers");
816 return false;
817 }
818 let mut tc_sig_ok = false;
819 for epoch in self.verification_epochs() {
820 let bytes = crate::pacemaker::wish_signing_bytes(
821 &self.state.chain_id_hash,
822 epoch,
823 target_view,
824 hqc,
825 );
826 if self.verifier.verify(
827 &vi.public_key,
828 &bytes,
829 &tc.aggregate_signature.signatures[sig_idx],
830 ) {
831 tc_sig_ok = true;
832 break;
833 }
834 }
835 if !tc_sig_ok {
836 warn!(view = %tc.view, validator = %vi.id, "TC signer signature invalid");
837 return false;
838 }
839 power += vs.power_of(vi.id);
840 sig_idx += 1;
841 }
842 if sig_idx != tc.aggregate_signature.signatures.len() {
843 warn!(view = %tc.view, "TC has extra signatures beyond bitfield");
844 return false;
845 }
846 if power < vs.quorum_threshold() {
847 warn!(view = %tc.view, power, threshold = vs.quorum_threshold(), "TC insufficient quorum");
848 return false;
849 }
850 true
851 }
852 ConsensusMessage::StatusCert {
853 locked_qc,
854 validator,
855 signature,
856 } => {
857 let Some(vi) = vs.get(*validator) else {
858 warn!(validator = %validator, "status from unknown validator");
859 return false;
860 };
861 let mut status_ok = false;
862 for epoch in self.verification_epochs() {
863 let bytes = view_protocol::status_signing_bytes(
864 &self.state.chain_id_hash,
865 epoch,
866 self.state.current_view,
867 locked_qc,
868 );
869 if self.verifier.verify(&vi.public_key, &bytes, signature) {
870 status_ok = true;
871 break;
872 }
873 }
874 if !status_ok {
875 warn!(validator = %validator, "invalid status signature");
876 return false;
877 }
878 true
879 }
880 ConsensusMessage::Evidence(_) => {
881 true
885 }
886 }
887 }
888
889 async fn handle_message(
890 &mut self,
891 _sender: Option<ValidatorId>,
892 msg: ConsensusMessage,
893 ) -> Result<()> {
894 let verified = tokio::task::block_in_place(|| self.verify_message(&msg));
899 if !verified {
900 return Ok(());
901 }
902
903 match msg {
904 ConsensusMessage::Propose {
905 block,
906 justify,
907 double_cert,
908 signature: _,
909 } => {
910 let block = *block;
911 let justify = *justify;
912 let double_cert = double_cert.map(|dc| *dc);
913
914 if block.view > self.state.current_view {
916 if let Some(ref dc) = double_cert {
917 if !tokio::task::block_in_place(|| self.validate_double_cert(dc)) {
918 return Ok(());
919 }
920
921 self.apply_commit(dc, "fast-forward").await;
923 self.state.highest_double_cert = Some(dc.clone());
924 self.advance_view_to(block.view, ViewEntryTrigger::DoubleCert(dc.clone()))
925 .await;
926 } else {
927 return Ok(());
928 }
929 } else if block.view < self.state.current_view {
930 if block.height > self.state.last_committed_height {
936 let expected = hotmint_crypto::compute_block_hash(&block);
938 if block.hash == expected {
939 let mut store = self.store.write();
940 store.put_block(block);
941 }
942 }
943 return Ok(());
944 }
945
946 let mut store = self.store.write();
947
948 if let Some(ref dc) = double_cert
953 && !tokio::task::block_in_place(|| self.validate_double_cert(dc))
954 {
955 return Ok(());
956 }
957
958 if justify.aggregate_signature.count() > 0
964 && let Some(justified_block) = store.get_block(&justify.block_hash)
965 && store.get_commit_qc(justified_block.height).is_none()
966 {
967 store.put_commit_qc(justified_block.height, justify.clone());
968 }
969
970 if let Some(ref dc) = double_cert
972 && let Some(ref mut wal) = self.wal
973 && let Some(target_block) = store.get_block(&dc.inner_qc.block_hash)
974 && let Err(e) = wal.log_commit_intent(target_block.height)
975 {
976 warn!(error = %e, "WAL: failed to log commit intent for fast-forward");
977 }
978
979 let proposal_result = view_protocol::on_proposal(
980 &mut self.state,
981 view_protocol::ProposalData {
982 block,
983 justify,
984 double_cert,
985 },
986 store.as_mut(),
987 self.network.as_ref(),
988 self.app.as_ref(),
989 self.signer.as_ref(),
990 )
991 .c(d!())?;
992 drop(store);
993
994 if let Some(result) = proposal_result.commit_result {
997 self.process_commit_result(&result);
998 }
999 if let Some(epoch) = proposal_result.pending_epoch {
1000 self.pending_epoch = Some(epoch);
1001 }
1002 }
1003
1004 ConsensusMessage::VoteMsg(vote) => {
1005 if vote.view != self.state.current_view {
1006 return Ok(());
1007 }
1008 if !self.state.is_leader() {
1009 return Ok(());
1010 }
1011 if vote.vote_type != VoteType::Vote {
1012 return Ok(());
1013 }
1014
1015 let result = self
1016 .vote_collector
1017 .add_vote(
1018 &self.state.validator_set,
1019 vote,
1020 self.state.current_epoch.number,
1021 )
1022 .c(d!())?;
1023 self.handle_equivocation(&result);
1024 if let Some(qc) = result.qc {
1025 self.on_qc_formed(qc).await;
1026 }
1027 }
1028
1029 ConsensusMessage::Prepare {
1030 certificate,
1031 signature: _,
1032 } => {
1033 if certificate.view < self.state.current_view {
1034 return Ok(());
1035 }
1036 if certificate.view == self.state.current_view {
1037 let store = self.store.read();
1042 let block_opt = store.get_block(&certificate.block_hash);
1043 if self.app.tracks_app_hash()
1044 && let Some(ref block) = block_opt
1045 && block.app_hash != self.state.last_app_hash
1046 {
1047 warn!(
1048 block_app_hash = %block.app_hash,
1049 local_app_hash = %self.state.last_app_hash,
1050 "prepare block app_hash mismatch, ignoring"
1051 );
1052 return Ok(());
1053 }
1054
1055 let vote_extension = block_opt.and_then(|block| {
1058 let ctx = BlockContext {
1059 height: block.height,
1060 view: self.state.current_view,
1061 proposer: block.proposer,
1062 epoch: self.state.current_epoch.number,
1063 epoch_start_view: self.state.current_epoch.start_view,
1064 validator_set: &self.state.validator_set,
1065 vote_extensions: vec![],
1066 };
1067 self.app.extend_vote(&block, &ctx)
1068 });
1069 drop(store);
1070
1071 view_protocol::on_prepare(
1072 &mut self.state,
1073 certificate,
1074 self.network.as_ref(),
1075 self.signer.as_ref(),
1076 vote_extension,
1077 );
1078 }
1079 }
1080
1081 ConsensusMessage::Vote2Msg(vote) => {
1082 if vote.view != self.state.current_view {
1083 return Ok(());
1084 }
1085 if vote.vote_type != VoteType::Vote2 {
1086 return Ok(());
1087 }
1088
1089 if let Some(ref ext) = vote.extension
1091 && !self
1092 .app
1093 .verify_vote_extension(ext, &vote.block_hash, vote.validator)
1094 {
1095 warn!(
1096 validator = %vote.validator,
1097 view = %vote.view,
1098 "rejecting vote2: invalid vote extension"
1099 );
1100 return Ok(());
1101 }
1102
1103 let result = self
1104 .vote_collector
1105 .add_vote(
1106 &self.state.validator_set,
1107 vote,
1108 self.state.current_epoch.number,
1109 )
1110 .c(d!())?;
1111 self.handle_equivocation(&result);
1112 if let Some(outer_qc) = result.qc {
1113 self.on_double_cert_formed(outer_qc, result.extensions)
1114 .await;
1115 }
1116 }
1117
1118 ConsensusMessage::Wish {
1119 target_view,
1120 validator,
1121 highest_qc,
1122 signature,
1123 } => {
1124 if let Some(ref qc) = highest_qc
1127 && qc.aggregate_signature.count() > 0
1128 {
1129 let qc_bytes = Vote::signing_bytes(
1130 &self.state.chain_id_hash,
1131 qc.epoch,
1132 qc.view,
1133 &qc.block_hash,
1134 VoteType::Vote,
1135 );
1136 if !tokio::task::block_in_place(|| {
1137 self.verifier.verify_aggregate(
1138 &self.state.validator_set,
1139 &qc_bytes,
1140 &qc.aggregate_signature,
1141 )
1142 }) {
1143 warn!(validator = %validator, "wish carries invalid highest_qc signature");
1144 return Ok(());
1145 }
1146 if qc.epoch == self.state.current_epoch.number
1152 && !hotmint_crypto::has_quorum(
1153 &self.state.validator_set,
1154 &qc.aggregate_signature,
1155 )
1156 {
1157 warn!(validator = %validator, "wish carries highest_qc without quorum");
1158 return Ok(());
1159 }
1160 }
1161
1162 if let Some(tc) = self.pacemaker.add_wish(
1163 &self.state.validator_set,
1164 target_view,
1165 validator,
1166 highest_qc,
1167 signature,
1168 ) {
1169 info!(
1170 validator = %self.state.validator_id,
1171 view = %tc.view,
1172 "TC formed, advancing view"
1173 );
1174 self.network
1175 .broadcast(ConsensusMessage::TimeoutCert(tc.clone()));
1176 self.advance_view(ViewEntryTrigger::TimeoutCert(tc)).await;
1177 }
1178 }
1179
1180 ConsensusMessage::TimeoutCert(tc) => {
1181 if self.pacemaker.should_relay_tc(&tc) {
1182 self.network
1183 .broadcast(ConsensusMessage::TimeoutCert(tc.clone()));
1184 }
1185 let new_view = ViewNumber(tc.view.as_u64() + 1);
1186 if new_view > self.state.current_view {
1187 self.advance_view(ViewEntryTrigger::TimeoutCert(tc)).await;
1188 }
1189 }
1190
1191 ConsensusMessage::StatusCert {
1192 locked_qc,
1193 validator,
1194 signature: _,
1195 } => {
1196 if self.state.is_leader() && self.state.step == ViewStep::WaitingForStatus {
1197 if let Some(ref qc) = locked_qc {
1198 self.state.update_highest_qc(qc);
1199 }
1200 self.status_senders.insert(validator);
1201 let status_power: u64 = self
1202 .status_senders
1203 .iter()
1204 .map(|v| self.state.validator_set.power_of(*v))
1205 .sum();
1206 let total_power =
1208 status_power + self.state.validator_set.power_of(self.state.validator_id);
1209 if total_power >= self.state.validator_set.quorum_threshold() {
1210 self.try_propose().await;
1211 }
1212 }
1213 }
1214
1215 ConsensusMessage::Evidence(proof) => {
1216 let vs = &self.state.validator_set;
1220 let vi = match vs.get(proof.validator) {
1221 Some(vi) => vi,
1222 None => {
1223 warn!(validator = %proof.validator, "evidence for unknown validator");
1224 return Ok(());
1225 }
1226 };
1227 if proof.block_hash_a == proof.block_hash_b {
1228 warn!(validator = %proof.validator, "evidence has identical block hashes");
1229 return Ok(());
1230 }
1231 let bytes_a = Vote::signing_bytes(
1234 &self.state.chain_id_hash,
1235 proof.epoch,
1236 proof.view,
1237 &proof.block_hash_a,
1238 proof.vote_type,
1239 );
1240 let bytes_b = Vote::signing_bytes(
1241 &self.state.chain_id_hash,
1242 proof.epoch,
1243 proof.view,
1244 &proof.block_hash_b,
1245 proof.vote_type,
1246 );
1247 if !self
1248 .verifier
1249 .verify(&vi.public_key, &bytes_a, &proof.signature_a)
1250 || !self
1251 .verifier
1252 .verify(&vi.public_key, &bytes_b, &proof.signature_b)
1253 {
1254 warn!(validator = %proof.validator, "evidence has invalid signatures");
1255 return Ok(());
1256 }
1257
1258 info!(
1259 validator = %proof.validator,
1260 view = %proof.view,
1261 "received valid evidence gossip"
1262 );
1263 if let Err(e) = self.app.on_evidence(&proof) {
1264 warn!(error = %e, "on_evidence callback failed for gossiped proof");
1265 }
1266 if let Some(ref mut store) = self.evidence_store {
1267 store.put_evidence(proof);
1268 }
1269 }
1270 }
1271 Ok(())
1272 }
1273
1274 fn handle_equivocation(&mut self, result: &crate::vote_collector::VoteResult) {
1275 if let Some(ref proof) = result.equivocation {
1276 warn!(
1277 validator = %proof.validator,
1278 view = %proof.view,
1279 "equivocation detected!"
1280 );
1281 if let Err(e) = self.app.on_evidence(proof) {
1282 warn!(error = %e, "on_evidence callback failed");
1283 }
1284 self.network.broadcast_evidence(proof);
1285 if let Some(ref mut store) = self.evidence_store {
1286 store.put_evidence(proof.clone());
1287 }
1288 }
1289 }
1290
1291 async fn on_qc_formed(&mut self, qc: QuorumCertificate) {
1292 self.current_view_qc = Some(qc.clone());
1294
1295 view_protocol::on_votes_collected(
1296 &mut self.state,
1297 qc.clone(),
1298 self.network.as_ref(),
1299 self.signer.as_ref(),
1300 );
1301
1302 let vote_extension = {
1305 let store = self.store.read();
1306 store.get_block(&qc.block_hash).and_then(|block| {
1307 let ctx = BlockContext {
1308 height: block.height,
1309 view: self.state.current_view,
1310 proposer: block.proposer,
1311 epoch: self.state.current_epoch.number,
1312 epoch_start_view: self.state.current_epoch.start_view,
1313 validator_set: &self.state.validator_set,
1314 vote_extensions: vec![],
1315 };
1316 self.app.extend_vote(&block, &ctx)
1317 })
1318 };
1319 let vote_bytes = Vote::signing_bytes(
1320 &self.state.chain_id_hash,
1321 self.state.current_epoch.number,
1322 self.state.current_view,
1323 &qc.block_hash,
1324 VoteType::Vote2,
1325 );
1326 let signature = self.signer.sign(&vote_bytes);
1327 let vote = Vote {
1328 block_hash: qc.block_hash,
1329 view: self.state.current_view,
1330 validator: self.state.validator_id,
1331 signature,
1332 vote_type: VoteType::Vote2,
1333 extension: vote_extension,
1334 };
1335
1336 self.state.update_locked_qc(&qc);
1338
1339 let next_leader_id =
1340 leader::next_leader(&self.state.validator_set, self.state.current_view);
1341 if next_leader_id == self.state.validator_id {
1342 match self.vote_collector.add_vote(
1344 &self.state.validator_set,
1345 vote,
1346 self.state.current_epoch.number,
1347 ) {
1348 Ok(result) => {
1349 self.handle_equivocation(&result);
1350 if let Some(outer_qc) = result.qc {
1351 self.on_double_cert_formed(outer_qc, result.extensions)
1352 .await;
1353 }
1354 }
1355 Err(e) => warn!(error = %e, "failed to add self vote2"),
1356 }
1357 } else {
1358 self.network
1359 .send_to(next_leader_id, ConsensusMessage::Vote2Msg(vote));
1360 }
1361 }
1362
1363 async fn on_double_cert_formed(
1364 &mut self,
1365 outer_qc: QuorumCertificate,
1366 extensions: Vec<(ValidatorId, Vec<u8>)>,
1367 ) {
1368 let inner_qc = match self.current_view_qc.take() {
1370 Some(qc) if qc.block_hash == outer_qc.block_hash => qc,
1371 _ => {
1372 match &self.state.locked_qc {
1374 Some(qc) if qc.block_hash == outer_qc.block_hash => qc.clone(),
1375 _ => match &self.state.highest_qc {
1376 Some(qc) if qc.block_hash == outer_qc.block_hash => qc.clone(),
1377 _ => {
1378 warn!(
1379 validator = %self.state.validator_id,
1380 "double cert formed but can't find matching inner QC"
1381 );
1382 return;
1383 }
1384 },
1385 }
1386 }
1387 };
1388
1389 let dc = DoubleCertificate {
1390 inner_qc,
1391 outer_qc,
1392 vote_extensions: extensions,
1393 };
1394
1395 info!(
1396 validator = %self.state.validator_id,
1397 view = %self.state.current_view,
1398 hash = %dc.inner_qc.block_hash,
1399 "double certificate formed, committing"
1400 );
1401
1402 self.apply_commit(&dc, "double-cert").await;
1404
1405 self.state.highest_double_cert = Some(dc.clone());
1406
1407 self.advance_view(ViewEntryTrigger::DoubleCert(dc)).await;
1409 }
1410
1411 async fn handle_timeout(&mut self) {
1412 let has_power = self.state.validator_set.power_of(self.state.validator_id) > 0;
1416 if !has_power {
1417 self.pacemaker.on_timeout();
1418 return;
1419 }
1420
1421 info!(
1422 validator = %self.state.validator_id,
1423 view = %self.state.current_view,
1424 "view timeout, sending wish"
1425 );
1426
1427 let wish = self.pacemaker.build_wish(
1428 &self.state.chain_id_hash,
1429 self.state.current_epoch.number,
1430 self.state.current_view,
1431 self.state.validator_id,
1432 self.state.highest_qc.clone(),
1433 self.signer.as_ref(),
1434 );
1435
1436 self.network.broadcast(wish.clone());
1437
1438 if let ConsensusMessage::Wish {
1440 target_view,
1441 validator,
1442 highest_qc,
1443 signature,
1444 } = wish
1445 && let Some(tc) = self.pacemaker.add_wish(
1446 &self.state.validator_set,
1447 target_view,
1448 validator,
1449 highest_qc,
1450 signature,
1451 )
1452 {
1453 self.network
1454 .broadcast(ConsensusMessage::TimeoutCert(tc.clone()));
1455 self.advance_view(ViewEntryTrigger::TimeoutCert(tc)).await;
1456 return;
1457 }
1458
1459 self.pacemaker.on_timeout();
1461 }
1462
1463 fn process_commit_result(&mut self, result: &CommitResult) {
1468 if result.committed_blocks.is_empty() {
1469 return;
1470 }
1471 {
1472 let mut s = self.store.write();
1473 for (i, block) in result.committed_blocks.iter().enumerate() {
1474 if result.commit_qc.block_hash == block.hash {
1475 s.put_commit_qc(block.height, result.commit_qc.clone());
1476 }
1477 let txs = crate::commit::decode_payload(&block.payload);
1478 for (tx_idx, tx) in txs.iter().enumerate() {
1479 let tx_hash = *blake3::hash(tx).as_bytes();
1480 s.put_tx_index(tx_hash, block.height, tx_idx as u32);
1481 }
1482 if let Some(resp) = result.block_responses.get(i) {
1483 s.put_block_results(block.height, resp.clone());
1484 }
1485 }
1486 s.flush();
1487 }
1488 if let Some(ref mut ev_store) = self.evidence_store {
1489 for block in &result.committed_blocks {
1490 for proof in &block.evidence {
1491 ev_store.mark_committed(proof.view, proof.validator);
1492 }
1493 for proof in ev_store.get_pending() {
1494 if proof.view <= block.view {
1495 ev_store.mark_committed(proof.view, proof.validator);
1496 }
1497 }
1498 }
1499 }
1500 self.liveness_tracker.record_commit(
1501 &self.state.validator_set,
1502 &result.commit_qc.aggregate_signature.signers,
1503 );
1504 self.persist_state();
1505 if let Some(ref mut wal) = self.wal
1506 && let Err(e) = wal.log_commit_done(self.state.last_committed_height)
1507 {
1508 warn!(error = %e, "WAL: failed to log commit done");
1509 }
1510 }
1511
1512 async fn apply_commit(&mut self, dc: &DoubleCertificate, context: &str) {
1515 if let Some(ref mut wal) = self.wal {
1517 let target_height = {
1518 let store = self.store.read();
1519 store.get_block(&dc.inner_qc.block_hash).map(|b| b.height)
1520 };
1521 if let Some(h) = target_height
1522 && let Err(e) = wal.log_commit_intent(h)
1523 {
1524 warn!(error = %e, "WAL: failed to log commit intent");
1525 }
1526 }
1527
1528 let store = self.store.read();
1529 match try_commit(
1530 dc,
1531 store.as_ref(),
1532 self.app.as_ref(),
1533 &mut self.state.last_committed_height,
1534 &self.state.current_epoch,
1535 ) {
1536 Ok(result) => {
1537 if !result.committed_blocks.is_empty() {
1538 self.state.last_app_hash = result.last_app_hash;
1539 }
1540 if result.pending_epoch.is_some() {
1541 self.pending_epoch = result.pending_epoch.clone();
1542 }
1543 drop(store);
1544 self.process_commit_result(&result);
1545 }
1546 Err(e) => {
1547 warn!(error = %e, "try_commit failed during {context}");
1548 drop(store);
1549 }
1550 }
1551 }
1552
1553 fn validate_double_cert(&self, dc: &DoubleCertificate) -> bool {
1565 if dc.inner_qc.block_hash != dc.outer_qc.block_hash {
1566 warn!("double cert inner/outer block_hash mismatch");
1567 return false;
1568 }
1569 let vs = &self.state.validator_set;
1570 let inner_bytes = Vote::signing_bytes(
1571 &self.state.chain_id_hash,
1572 dc.inner_qc.epoch,
1573 dc.inner_qc.view,
1574 &dc.inner_qc.block_hash,
1575 VoteType::Vote,
1576 );
1577 if !self
1578 .verifier
1579 .verify_aggregate(vs, &inner_bytes, &dc.inner_qc.aggregate_signature)
1580 {
1581 warn!("double cert inner QC signature invalid");
1582 return false;
1583 }
1584 if !hotmint_crypto::has_quorum(vs, &dc.inner_qc.aggregate_signature) {
1585 warn!("double cert inner QC below quorum threshold");
1586 return false;
1587 }
1588 let outer_bytes = Vote::signing_bytes(
1589 &self.state.chain_id_hash,
1590 dc.outer_qc.epoch,
1591 dc.outer_qc.view,
1592 &dc.outer_qc.block_hash,
1593 VoteType::Vote2,
1594 );
1595 if !self
1596 .verifier
1597 .verify_aggregate(vs, &outer_bytes, &dc.outer_qc.aggregate_signature)
1598 {
1599 warn!("double cert outer QC signature invalid");
1600 return false;
1601 }
1602 if !hotmint_crypto::has_quorum(vs, &dc.outer_qc.aggregate_signature) {
1603 warn!("double cert outer QC below quorum threshold");
1604 return false;
1605 }
1606 true
1607 }
1608
1609 fn persist_state(&mut self) {
1610 if let Some(p) = self.persistence.as_mut() {
1611 p.save_current_view(self.state.current_view);
1612 if let Some(ref qc) = self.state.locked_qc {
1613 p.save_locked_qc(qc);
1614 }
1615 if let Some(ref qc) = self.state.highest_qc {
1616 p.save_highest_qc(qc);
1617 }
1618 p.save_last_committed_height(self.state.last_committed_height);
1619 p.save_current_epoch(&self.state.current_epoch);
1620 p.save_last_app_hash(self.state.last_app_hash);
1621 p.flush();
1622 }
1623 }
1624
1625 async fn advance_view(&mut self, trigger: ViewEntryTrigger) {
1626 let new_view = match &trigger {
1627 ViewEntryTrigger::DoubleCert(_) => self.state.current_view.next(),
1628 ViewEntryTrigger::TimeoutCert(tc) => ViewNumber(tc.view.as_u64() + 1),
1629 ViewEntryTrigger::Genesis => ViewNumber(1),
1630 };
1631 self.advance_view_to(new_view, trigger).await;
1632 }
1633
1634 async fn advance_view_to(&mut self, new_view: ViewNumber, trigger: ViewEntryTrigger) {
1635 if new_view <= self.state.current_view {
1636 return;
1637 }
1638
1639 let is_progress = matches!(&trigger, ViewEntryTrigger::DoubleCert(_));
1641
1642 if let ViewEntryTrigger::DoubleCert(ref dc) = trigger {
1644 self.state.pending_vote_extensions = dc.vote_extensions.clone();
1645 } else {
1646 self.state.pending_vote_extensions.clear();
1647 }
1648
1649 self.vote_collector.clear_view(self.state.current_view);
1650 self.vote_collector.prune_before(self.state.current_view);
1651 self.pacemaker.clear_view(self.state.current_view);
1652 self.pacemaker.prune_before(self.state.current_view);
1653 self.status_senders.clear();
1654 self.current_view_qc = None;
1655
1656 if self
1660 .pending_epoch
1661 .as_ref()
1662 .is_some_and(|e| new_view >= e.start_view)
1663 {
1664 let Some(new_epoch) = self.pending_epoch.take() else {
1666 unreachable!("pending_epoch was Some in the condition check");
1667 };
1668 info!(
1669 validator = %self.state.validator_id,
1670 old_epoch = %self.state.current_epoch.number,
1671 new_epoch = %new_epoch.number,
1672 start_view = %new_epoch.start_view,
1673 validators = new_epoch.validator_set.validator_count(),
1674 "epoch transition"
1675 );
1676 let offline = self.liveness_tracker.offline_validators();
1678 if !offline.is_empty() {
1679 let evidence: Vec<OfflineEvidence> = offline
1680 .iter()
1681 .map(|&(validator, missed, total)| OfflineEvidence {
1682 validator,
1683 missed_commits: missed,
1684 total_commits: total,
1685 evidence_height: self.state.last_committed_height,
1686 })
1687 .collect();
1688 info!(
1689 offline_count = evidence.len(),
1690 epoch = %self.state.current_epoch.number,
1691 "reporting offline validators"
1692 );
1693 if let Err(e) = self.app.on_offline_validators(&evidence) {
1694 warn!(error = %e, "on_offline_validators callback failed");
1695 }
1696 }
1697 self.liveness_tracker.reset();
1698
1699 self.state.validator_set = new_epoch.validator_set.clone();
1700 self.state.current_epoch = new_epoch;
1701 self.network
1703 .on_epoch_change(self.state.current_epoch.number, &self.state.validator_set);
1704 self.vote_collector = VoteCollector::new();
1706 self.pacemaker = Pacemaker::with_config(self.pacemaker_config.clone());
1707 }
1708
1709 view_protocol::enter_view(
1710 &mut self.state,
1711 new_view,
1712 trigger,
1713 self.network.as_ref(),
1714 self.signer.as_ref(),
1715 );
1716
1717 if is_progress {
1718 self.pacemaker.reset_on_progress();
1719 } else {
1720 self.pacemaker.reset_timer();
1721 }
1722
1723 self.persist_state();
1724
1725 if self.state.is_leader() && self.state.step == ViewStep::WaitingForStatus {
1731 self.try_propose().await;
1732 }
1733 }
1734}
1735
1736#[cfg(test)]
1741mod tests {
1742 use super::*;
1743
1744 use std::sync::Arc;
1745
1746 use parking_lot::RwLock;
1747
1748 use hotmint_crypto::{Ed25519Signer, Ed25519Verifier};
1749 use hotmint_types::Signer as SignerTrait;
1750 use hotmint_types::certificate::QuorumCertificate;
1751 use hotmint_types::crypto::AggregateSignature;
1752 use hotmint_types::epoch::EpochNumber;
1753 use hotmint_types::validator::{ValidatorId, ValidatorInfo};
1754 use hotmint_types::vote::{Vote, VoteType};
1755 use tokio::sync::mpsc;
1756
1757 use crate::application::NoopApplication;
1758 use crate::network::NetworkSink;
1759 use crate::state::ConsensusState;
1760 use crate::store::MemoryBlockStore;
1761
1762 struct DevNullNetwork;
1764 impl NetworkSink for DevNullNetwork {
1765 fn broadcast(&self, _: ConsensusMessage) {}
1766 fn send_to(&self, _: ValidatorId, _: ConsensusMessage) {}
1767 }
1768
1769 fn test_chain_id_hash() -> [u8; 32] {
1771 *blake3::hash(b"").as_bytes()
1772 }
1773
1774 fn make_validator_set_4() -> (ValidatorSet, Vec<Ed25519Signer>) {
1775 let signers: Vec<Ed25519Signer> = (0..4)
1776 .map(|i| Ed25519Signer::generate(ValidatorId(i)))
1777 .collect();
1778 let infos: Vec<ValidatorInfo> = signers
1779 .iter()
1780 .map(|s| ValidatorInfo {
1781 id: s.validator_id(),
1782 public_key: s.public_key(),
1783 power: 1,
1784 })
1785 .collect();
1786 (ValidatorSet::new(infos), signers)
1787 }
1788
1789 fn make_test_engine(
1790 vid: ValidatorId,
1791 vs: ValidatorSet,
1792 signer: Ed25519Signer,
1793 ) -> (
1794 ConsensusEngine,
1795 mpsc::Sender<(Option<ValidatorId>, ConsensusMessage)>,
1796 ) {
1797 let (tx, rx) = mpsc::channel(64);
1798 let store = Arc::new(RwLock::new(
1799 Box::new(MemoryBlockStore::new()) as Box<dyn crate::store::BlockStore>
1800 ));
1801 let state = ConsensusState::new(vid, vs);
1802 let engine = ConsensusEngine::new(
1803 state,
1804 store,
1805 Box::new(DevNullNetwork),
1806 Box::new(NoopApplication),
1807 Box::new(signer),
1808 rx,
1809 EngineConfig {
1810 verifier: Box::new(Ed25519Verifier),
1811 pacemaker: None,
1812 persistence: None,
1813 evidence_store: None,
1814 wal: None,
1815 },
1816 );
1817 (engine, tx)
1818 }
1819
1820 #[test]
1823 fn r29_propose_sub_quorum_justify_rejected_by_verify_message() {
1824 let (vs, signers) = make_validator_set_4();
1825 let engine_signer = Ed25519Signer::generate(ValidatorId(0));
1828 let (engine, _tx) = make_test_engine(ValidatorId(0), vs.clone(), engine_signer);
1829
1830 let chain_id_hash = test_chain_id_hash();
1832 let hash = BlockHash::GENESIS;
1833 let qc_view = ViewNumber::GENESIS;
1834 let vote_bytes = Vote::signing_bytes(
1835 &chain_id_hash,
1836 EpochNumber(0),
1837 qc_view,
1838 &hash,
1839 VoteType::Vote,
1840 );
1841 let mut agg = AggregateSignature::new(4);
1842 agg.add(1, SignerTrait::sign(&signers[1], &vote_bytes))
1843 .unwrap();
1844 let sub_quorum_qc = QuorumCertificate {
1845 block_hash: hash,
1846 view: qc_view,
1847 aggregate_signature: agg,
1848 epoch: EpochNumber(0),
1849 };
1850
1851 let mut block = Block::genesis();
1853 block.height = Height(1);
1854 block.view = ViewNumber(1);
1855 block.proposer = ValidatorId(1);
1856 block.hash = block.compute_hash();
1857 let proposal_bytes = crate::view_protocol::proposal_signing_bytes(
1858 &chain_id_hash,
1859 EpochNumber(0),
1860 &block,
1861 &sub_quorum_qc,
1862 );
1863 let signature = SignerTrait::sign(&signers[1], &proposal_bytes);
1864
1865 let msg = ConsensusMessage::Propose {
1866 block: Box::new(block),
1867 justify: Box::new(sub_quorum_qc),
1868 double_cert: None,
1869 signature,
1870 };
1871
1872 assert!(
1873 !engine.verify_message(&msg),
1874 "R-29 regression: Propose with sub-quorum justify QC must be rejected by verify_message"
1875 );
1876 }
1877
1878 #[test]
1880 fn r29_propose_full_quorum_justify_accepted_by_verify_message() {
1881 let (vs, signers) = make_validator_set_4();
1882 let engine_signer = Ed25519Signer::generate(ValidatorId(0));
1883 let (engine, _tx) = make_test_engine(ValidatorId(0), vs.clone(), engine_signer);
1884
1885 let chain_id_hash = test_chain_id_hash();
1886 let hash = BlockHash::GENESIS;
1887 let qc_view = ViewNumber::GENESIS;
1888 let vote_bytes = Vote::signing_bytes(
1889 &chain_id_hash,
1890 EpochNumber(0),
1891 qc_view,
1892 &hash,
1893 VoteType::Vote,
1894 );
1895 let mut agg = AggregateSignature::new(4);
1897 for (i, signer) in signers.iter().take(3).enumerate() {
1898 agg.add(i, SignerTrait::sign(signer, &vote_bytes)).unwrap();
1899 }
1900 let full_quorum_qc = QuorumCertificate {
1901 block_hash: hash,
1902 view: qc_view,
1903 aggregate_signature: agg,
1904 epoch: EpochNumber(0),
1905 };
1906
1907 let mut block = Block::genesis();
1908 block.height = Height(1);
1909 block.view = ViewNumber(1);
1910 block.proposer = ValidatorId(1);
1911 block.hash = block.compute_hash();
1912 let proposal_bytes = crate::view_protocol::proposal_signing_bytes(
1913 &chain_id_hash,
1914 EpochNumber(0),
1915 &block,
1916 &full_quorum_qc,
1917 );
1918 let signature = SignerTrait::sign(&signers[1], &proposal_bytes);
1919
1920 let msg = ConsensusMessage::Propose {
1921 block: Box::new(block),
1922 justify: Box::new(full_quorum_qc),
1923 double_cert: None,
1924 signature,
1925 };
1926
1927 assert!(
1928 engine.verify_message(&msg),
1929 "R-29: Propose with full quorum justify QC must pass verify_message"
1930 );
1931 }
1932
1933 #[test]
1941 fn r32_sub_quorum_highest_qc_fails_has_quorum() {
1942 let (vs, signers) = make_validator_set_4();
1943
1944 let chain_id_hash = test_chain_id_hash();
1945 let hash = BlockHash([1u8; 32]);
1946 let qc_view = ViewNumber(1);
1947 let vote_bytes = Vote::signing_bytes(
1948 &chain_id_hash,
1949 EpochNumber(0),
1950 qc_view,
1951 &hash,
1952 VoteType::Vote,
1953 );
1954
1955 let mut agg = AggregateSignature::new(4);
1957 agg.add(0, SignerTrait::sign(&signers[0], &vote_bytes))
1958 .unwrap();
1959
1960 assert!(
1961 !hotmint_crypto::has_quorum(&vs, &agg),
1962 "R-32 regression: 1-of-4 signed QC must not satisfy has_quorum"
1963 );
1964
1965 let mut agg_full = AggregateSignature::new(4);
1967 for (i, signer) in signers.iter().take(3).enumerate() {
1968 agg_full
1969 .add(i, SignerTrait::sign(signer, &vote_bytes))
1970 .unwrap();
1971 }
1972 assert!(
1973 hotmint_crypto::has_quorum(&vs, &agg_full),
1974 "R-32: 3-of-4 signed QC must satisfy has_quorum"
1975 );
1976 }
1977}