1use ruc::*;
2
3use std::collections::{HashMap, HashSet};
4use std::sync::Arc;
5
6use crate::application::Application;
7use crate::commit::{CommitResult, try_commit};
8use crate::evidence_store::EvidenceStore;
9use crate::leader;
10use crate::liveness::{LivenessTracker, OfflineEvidence};
11use crate::network::NetworkSink;
12use crate::pacemaker::{Pacemaker, PacemakerConfig};
13use crate::state::{ConsensusState, ViewStep};
14use crate::store::BlockStore;
15use crate::view_protocol::{self, ViewEntryTrigger};
16use crate::vote_collector::VoteCollector;
17
18use hotmint_types::epoch::Epoch;
19use hotmint_types::vote::VoteType;
20use hotmint_types::*;
21use tokio::sync::mpsc;
22use tracing::{error, info, warn};
23
24pub type SharedBlockStore = Arc<parking_lot::RwLock<Box<dyn BlockStore>>>;
26
27pub trait StatePersistence: Send {
29 fn save_current_view(&mut self, view: ViewNumber);
30 fn save_locked_qc(&mut self, qc: &QuorumCertificate);
31 fn save_highest_qc(&mut self, qc: &QuorumCertificate);
32 fn save_last_committed_height(&mut self, height: Height);
33 fn save_current_epoch(&mut self, epoch: &Epoch);
34 fn save_last_app_hash(&mut self, hash: BlockHash);
35 fn save_pending_epoch(&mut self, epoch: Option<&Epoch>);
36 fn flush(&self);
37}
38
39pub trait Wal: Send {
41 fn log_commit_intent(&mut self, target_height: Height) -> std::io::Result<()>;
43 fn log_commit_done(&mut self, target_height: Height) -> std::io::Result<()>;
45}
46
47pub struct ConsensusEngine {
48 state: ConsensusState,
49 store: SharedBlockStore,
50 network: Box<dyn NetworkSink>,
51 app: Box<dyn Application>,
52 signer: Box<dyn Signer>,
53 verifier: Box<dyn Verifier>,
54 vote_collector: VoteCollector,
55 pacemaker: Pacemaker,
56 pacemaker_config: PacemakerConfig,
57 msg_rx: mpsc::Receiver<(Option<ValidatorId>, ConsensusMessage)>,
58 status_senders: HashSet<ValidatorId>,
60 current_view_qc: Option<QuorumCertificate>,
62 pending_epoch: Option<Epoch>,
64 persistence: Option<Box<dyn StatePersistence>>,
66 evidence_store: Option<Box<dyn EvidenceStore>>,
68 liveness_tracker: LivenessTracker,
70 wal: Option<Box<dyn Wal>>,
72 msg_rate_limiter: HashMap<ValidatorId, (std::time::Instant, u32)>,
74}
75
76pub struct EngineConfig {
78 pub verifier: Box<dyn Verifier>,
79 pub pacemaker: Option<PacemakerConfig>,
80 pub persistence: Option<Box<dyn StatePersistence>>,
81 pub evidence_store: Option<Box<dyn EvidenceStore>>,
82 pub wal: Option<Box<dyn Wal>>,
83 pub pending_epoch: Option<Epoch>,
85}
86
87impl EngineConfig {
88 pub fn new(verifier: Box<dyn Verifier>) -> Self {
91 Self {
92 verifier,
93 pacemaker: None,
94 persistence: None,
95 evidence_store: None,
96 wal: None,
97 pending_epoch: None,
98 }
99 }
100
101 pub fn with_pacemaker(mut self, pacemaker: PacemakerConfig) -> Self {
103 self.pacemaker = Some(pacemaker);
104 self
105 }
106
107 pub fn with_persistence(mut self, persistence: Box<dyn StatePersistence>) -> Self {
109 self.persistence = Some(persistence);
110 self
111 }
112}
113
114pub struct ConsensusEngineBuilder {
130 state: Option<ConsensusState>,
131 store: Option<SharedBlockStore>,
132 network: Option<Box<dyn NetworkSink>>,
133 app: Option<Box<dyn Application>>,
134 signer: Option<Box<dyn Signer>>,
135 msg_rx: Option<mpsc::Receiver<(Option<ValidatorId>, ConsensusMessage)>>,
136 verifier: Option<Box<dyn Verifier>>,
137 pacemaker: Option<PacemakerConfig>,
138 persistence: Option<Box<dyn StatePersistence>>,
139 evidence_store: Option<Box<dyn EvidenceStore>>,
140 wal: Option<Box<dyn Wal>>,
141}
142
143impl ConsensusEngineBuilder {
144 pub fn new() -> Self {
145 Self {
146 state: None,
147 store: None,
148 network: None,
149 app: None,
150 signer: None,
151 msg_rx: None,
152 verifier: None,
153 pacemaker: None,
154 persistence: None,
155 evidence_store: None,
156 wal: None,
157 }
158 }
159
160 pub fn state(mut self, state: ConsensusState) -> Self {
161 self.state = Some(state);
162 self
163 }
164
165 pub fn store(mut self, store: SharedBlockStore) -> Self {
166 self.store = Some(store);
167 self
168 }
169
170 pub fn network(mut self, network: Box<dyn NetworkSink>) -> Self {
171 self.network = Some(network);
172 self
173 }
174
175 pub fn app(mut self, app: Box<dyn Application>) -> Self {
176 self.app = Some(app);
177 self
178 }
179
180 pub fn signer(mut self, signer: Box<dyn Signer>) -> Self {
181 self.signer = Some(signer);
182 self
183 }
184
185 pub fn messages(
186 mut self,
187 msg_rx: mpsc::Receiver<(Option<ValidatorId>, ConsensusMessage)>,
188 ) -> Self {
189 self.msg_rx = Some(msg_rx);
190 self
191 }
192
193 pub fn verifier(mut self, verifier: Box<dyn Verifier>) -> Self {
194 self.verifier = Some(verifier);
195 self
196 }
197
198 pub fn pacemaker(mut self, config: PacemakerConfig) -> Self {
199 self.pacemaker = Some(config);
200 self
201 }
202
203 pub fn persistence(mut self, persistence: Box<dyn StatePersistence>) -> Self {
204 self.persistence = Some(persistence);
205 self
206 }
207
208 pub fn evidence_store(mut self, store: Box<dyn EvidenceStore>) -> Self {
209 self.evidence_store = Some(store);
210 self
211 }
212
213 pub fn wal(mut self, wal: Box<dyn Wal>) -> Self {
214 self.wal = Some(wal);
215 self
216 }
217
218 pub fn build(self) -> ruc::Result<ConsensusEngine> {
219 let state = self.state.ok_or_else(|| ruc::eg!("state is required"))?;
220 let store = self.store.ok_or_else(|| ruc::eg!("store is required"))?;
221 let network = self
222 .network
223 .ok_or_else(|| ruc::eg!("network is required"))?;
224 let app = self.app.ok_or_else(|| ruc::eg!("app is required"))?;
225 let signer = self.signer.ok_or_else(|| ruc::eg!("signer is required"))?;
226 let msg_rx = self
227 .msg_rx
228 .ok_or_else(|| ruc::eg!("messages (msg_rx) is required"))?;
229 let verifier = self
230 .verifier
231 .ok_or_else(|| ruc::eg!("verifier is required"))?;
232
233 let config = EngineConfig {
234 verifier,
235 pacemaker: self.pacemaker,
236 persistence: self.persistence,
237 evidence_store: self.evidence_store,
238 wal: self.wal,
239 pending_epoch: None,
240 };
241
242 Ok(ConsensusEngine::new(
243 state, store, network, app, signer, msg_rx, config,
244 ))
245 }
246}
247
248impl Default for ConsensusEngineBuilder {
249 fn default() -> Self {
250 Self::new()
251 }
252}
253
254impl ConsensusEngine {
255 pub fn new(
256 state: ConsensusState,
257 store: SharedBlockStore,
258 network: Box<dyn NetworkSink>,
259 app: Box<dyn Application>,
260 signer: Box<dyn Signer>,
261 msg_rx: mpsc::Receiver<(Option<ValidatorId>, ConsensusMessage)>,
262 config: EngineConfig,
263 ) -> Self {
264 let pc = config.pacemaker.unwrap_or_default();
265 Self {
266 state,
267 store,
268 network,
269 app,
270 signer,
271 verifier: config.verifier,
272 vote_collector: VoteCollector::new(),
273 pacemaker: Pacemaker::with_config(pc.clone()),
274 pacemaker_config: pc,
275 msg_rx,
276 status_senders: HashSet::new(),
277 current_view_qc: None,
278 pending_epoch: config.pending_epoch,
279 persistence: config.persistence,
280 evidence_store: config.evidence_store,
281 liveness_tracker: LivenessTracker::new(),
282 wal: config.wal,
283 msg_rate_limiter: HashMap::new(),
284 }
285 }
286
287 pub async fn run(mut self) {
290 let app_info = self.app.info();
294 if app_info.last_block_height.as_u64() > 0 {
295 if app_info.last_block_height != self.state.last_committed_height {
296 error!(
297 app_height = app_info.last_block_height.as_u64(),
298 consensus_height = self.state.last_committed_height.as_u64(),
299 "FATAL: application height differs from consensus state — halting"
300 );
301 std::process::exit(1);
302 }
303 if app_info.last_block_app_hash != self.state.last_app_hash {
304 error!(
305 app_hash = %app_info.last_block_app_hash,
306 consensus_hash = %self.state.last_app_hash,
307 "FATAL: application app_hash differs from consensus state — halting"
308 );
309 std::process::exit(1);
310 }
311 }
312
313 if self.state.current_view.as_u64() <= 1 {
314 self.enter_genesis_view().await;
315 } else {
316 info!(
317 validator = %self.state.validator_id,
318 view = %self.state.current_view,
319 height = %self.state.last_committed_height,
320 "resuming from persisted state"
321 );
322 self.pacemaker.reset_timer();
323 }
324
325 loop {
326 let deadline = self.pacemaker.sleep_until_deadline();
327 tokio::pin!(deadline);
328
329 tokio::select! {
330 Some((sender, msg)) = self.msg_rx.recv() => {
331 if let Err(e) = self.handle_message(sender, msg).await {
332 warn!(validator = %self.state.validator_id, error = %e, "error handling message");
333 }
334 }
335 _ = &mut deadline => {
336 self.handle_timeout().await;
337 }
338 }
339 }
340 }
341
342 async fn enter_genesis_view(&mut self) {
343 let genesis_qc = QuorumCertificate {
345 block_hash: BlockHash::GENESIS,
346 view: ViewNumber::GENESIS,
347 aggregate_signature: AggregateSignature::new(
348 self.state.validator_set.validator_count(),
349 ),
350 epoch: self.state.current_epoch.number,
351 };
352 self.state.highest_qc = Some(genesis_qc);
353
354 let view = ViewNumber(1);
355 view_protocol::enter_view(
356 &mut self.state,
357 view,
358 ViewEntryTrigger::Genesis,
359 self.network.as_ref(),
360 self.signer.as_ref(),
361 );
362 self.pacemaker.reset_timer();
363
364 if self.state.is_leader() {
366 self.state.step = ViewStep::WaitingForStatus;
367 self.try_propose().await;
369 }
370 }
371
372 fn try_propose(
373 &mut self,
374 ) -> std::pin::Pin<Box<dyn std::future::Future<Output = ()> + Send + '_>> {
375 Box::pin(async move {
376 let pending_evidence = self
378 .evidence_store
379 .as_ref()
380 .map(|s| s.get_pending())
381 .unwrap_or_default();
382
383 let proposed_block = {
385 let mut store = self.store.write();
386 view_protocol::propose(
387 &mut self.state,
388 store.as_mut(),
389 self.network.as_ref(),
390 self.app.as_ref(),
391 self.signer.as_ref(),
392 pending_evidence,
393 )
394 }; match proposed_block {
397 Ok(block) => {
398 self.leader_self_vote(block.hash).await;
400 }
401 Err(e) => {
402 warn!(
403 validator = %self.state.validator_id,
404 error = %e,
405 "failed to propose"
406 );
407 }
408 }
409 })
410 }
411
412 async fn leader_self_vote(&mut self, block_hash: BlockHash) {
413 let vote_bytes = Vote::signing_bytes(
414 &self.state.chain_id_hash,
415 self.state.current_epoch.number,
416 self.state.current_view,
417 &block_hash,
418 VoteType::Vote,
419 );
420 let signature = self.signer.sign(&vote_bytes);
421 let vote = Vote {
422 block_hash,
423 view: self.state.current_view,
424 validator: self.state.validator_id,
425 signature,
426 vote_type: VoteType::Vote,
427 extension: None,
428 };
429 match self.vote_collector.add_vote(
430 &self.state.validator_set,
431 vote,
432 self.state.current_epoch.number,
433 ) {
434 Ok(result) => {
435 self.handle_equivocation(&result);
436 if let Some(qc) = result.qc {
437 self.on_qc_formed(qc).await;
438 }
439 }
440 Err(e) => warn!(error = %e, "failed to add self vote"),
441 }
442 }
443}
444
445pub fn verify_relay_sender(
462 sender: ValidatorId,
463 msg: &ConsensusMessage,
464 validator_keys: &HashMap<ValidatorId, hotmint_types::crypto::PublicKey>,
465 ordered_validators: &[ValidatorId],
466 chain_id_hash: &[u8; 32],
467 epoch: hotmint_types::epoch::EpochNumber,
468) -> bool {
469 use hotmint_crypto::Ed25519Verifier;
470 use hotmint_types::Verifier;
471 use hotmint_types::vote::Vote;
472 let verifier = Ed25519Verifier;
473 match msg {
474 ConsensusMessage::Propose {
475 block,
476 justify,
477 signature,
478 ..
479 } => {
480 let Some(pk) = validator_keys.get(&block.proposer) else {
481 return false;
482 };
483 let bytes =
484 crate::view_protocol::proposal_signing_bytes(chain_id_hash, epoch, block, justify);
485 Verifier::verify(&verifier, pk, &bytes, signature)
486 }
487 ConsensusMessage::VoteMsg(vote) | ConsensusMessage::Vote2Msg(vote) => {
488 let Some(pk) = validator_keys.get(&vote.validator) else {
489 return false;
490 };
491 let bytes = Vote::signing_bytes(
492 chain_id_hash,
493 epoch,
494 vote.view,
495 &vote.block_hash,
496 vote.vote_type,
497 );
498 Verifier::verify(&verifier, pk, &bytes, &vote.signature)
499 }
500 ConsensusMessage::Prepare {
501 certificate,
502 signature,
503 } => {
504 if !ordered_validators.is_empty() {
507 let n = ordered_validators.len();
508 let expected_leader = ordered_validators[certificate.view.as_u64() as usize % n];
509 if sender != expected_leader {
510 return false;
511 }
512 }
513 let Some(pk) = validator_keys.get(&sender) else {
514 return false;
515 };
516 let bytes =
517 crate::view_protocol::prepare_signing_bytes(chain_id_hash, epoch, certificate);
518 Verifier::verify(&verifier, pk, &bytes, signature)
519 }
520 ConsensusMessage::Wish {
521 target_view,
522 validator,
523 highest_qc,
524 signature,
525 } => {
526 let Some(pk) = validator_keys.get(validator) else {
527 return false;
528 };
529 let bytes = crate::pacemaker::wish_signing_bytes(
530 chain_id_hash,
531 epoch,
532 *target_view,
533 highest_qc.as_ref(),
534 );
535 Verifier::verify(&verifier, pk, &bytes, signature)
536 }
537 ConsensusMessage::TimeoutCert(tc) => {
538 let target_view = ViewNumber(tc.view.as_u64() + 1);
540 let n = ordered_validators.len();
541 if n == 0 || tc.aggregate_signature.signers.len() != n {
542 return false;
543 }
544 let mut sig_idx = 0usize;
545 let mut verified_count = 0usize;
546 for (i, &signed) in tc.aggregate_signature.signers.iter().enumerate() {
547 if !signed {
548 continue;
549 }
550 if i >= n {
551 return false;
552 }
553 let vid = ordered_validators[i];
554 let Some(pk) = validator_keys.get(&vid) else {
555 return false;
556 };
557 let hqc = tc.highest_qcs.get(i).and_then(|h| h.as_ref());
558 let bytes =
559 crate::pacemaker::wish_signing_bytes(chain_id_hash, epoch, target_view, hqc);
560 if sig_idx >= tc.aggregate_signature.signatures.len() {
561 return false;
562 }
563 if !Verifier::verify(
564 &verifier,
565 pk,
566 &bytes,
567 &tc.aggregate_signature.signatures[sig_idx],
568 ) {
569 return false;
570 }
571 sig_idx += 1;
572 verified_count += 1;
573 }
574 if sig_idx != tc.aggregate_signature.signatures.len() {
575 return false;
576 }
577 verified_count * 3 > n * 2
580 }
581 ConsensusMessage::StatusCert {
582 validator,
583 signature,
584 locked_qc,
585 ..
586 } => {
587 let Some(pk) = validator_keys.get(validator) else {
593 return false;
594 };
595 let _ = (pk, signature, locked_qc);
599 true
600 }
601 ConsensusMessage::Evidence(_) => {
602 validator_keys.contains_key(&sender)
605 }
606 }
607}
608
609impl ConsensusEngine {
610 fn verification_epochs(&self) -> [EpochNumber; 2] {
614 let cur = self.state.current_epoch.number;
615 let prev = if cur.as_u64() > 0 {
616 EpochNumber(cur.as_u64() - 1)
617 } else {
618 cur
619 };
620 [cur, prev]
621 }
622
623 fn verify_message(&self, msg: &ConsensusMessage) -> bool {
631 let msg_view = match msg {
636 ConsensusMessage::Propose { .. } => None, ConsensusMessage::VoteMsg(v) | ConsensusMessage::Vote2Msg(v) => Some(v.view),
638 ConsensusMessage::Prepare { certificate, .. } => Some(certificate.view),
639 ConsensusMessage::Wish { target_view, .. } => Some(*target_view),
640 ConsensusMessage::TimeoutCert(tc) => Some(ViewNumber(tc.view.as_u64() + 1)),
641 ConsensusMessage::StatusCert { .. } => None,
642 ConsensusMessage::Evidence(_) => None, };
644 if let Some(v) = msg_view
645 && v < self.state.current_view
646 {
647 return true; }
649
650 let vs = &self.state.validator_set;
651 match msg {
652 ConsensusMessage::Propose {
653 block,
654 justify,
655 signature,
656 ..
657 } => {
658 let proposer = vs.get(block.proposer);
659 let Some(vi) = proposer else {
660 warn!(proposer = %block.proposer, "propose from unknown validator");
661 return false;
662 };
663 let mut proposal_ok = false;
664 for epoch in self.verification_epochs() {
665 let bytes = view_protocol::proposal_signing_bytes(
666 &self.state.chain_id_hash,
667 epoch,
668 block,
669 justify,
670 );
671 if self.verifier.verify(&vi.public_key, &bytes, signature) {
672 proposal_ok = true;
673 break;
674 }
675 }
676 if !proposal_ok {
677 warn!(proposer = %block.proposer, "invalid proposal signature");
678 return false;
679 }
680 if justify.aggregate_signature.count() > 0 {
682 let qc_bytes = Vote::signing_bytes(
683 &self.state.chain_id_hash,
684 justify.epoch,
685 justify.view,
686 &justify.block_hash,
687 VoteType::Vote,
688 );
689 if !self
690 .verifier
691 .verify_aggregate(vs, &qc_bytes, &justify.aggregate_signature)
692 {
693 warn!(proposer = %block.proposer, "invalid justify QC aggregate signature");
694 return false;
695 }
696 if justify.epoch == self.state.current_epoch.number
697 && !hotmint_crypto::has_quorum(vs, &justify.aggregate_signature)
698 {
699 warn!(proposer = %block.proposer, "justify QC below quorum threshold");
700 return false;
701 }
702 }
703 true
704 }
705 ConsensusMessage::VoteMsg(vote) | ConsensusMessage::Vote2Msg(vote) => {
706 let Some(vi) = vs.get(vote.validator) else {
707 warn!(validator = %vote.validator, "vote from unknown validator");
708 return false;
709 };
710 let mut ok = false;
711 for epoch in self.verification_epochs() {
712 let bytes = Vote::signing_bytes(
713 &self.state.chain_id_hash,
714 epoch,
715 vote.view,
716 &vote.block_hash,
717 vote.vote_type,
718 );
719 if self
720 .verifier
721 .verify(&vi.public_key, &bytes, &vote.signature)
722 {
723 ok = true;
724 break;
725 }
726 }
727 if !ok {
728 warn!(validator = %vote.validator, "invalid vote signature");
729 return false;
730 }
731 true
732 }
733 ConsensusMessage::Prepare {
734 certificate,
735 signature,
736 } => {
737 let Some(leader) = vs.leader_for_view(certificate.view) else {
739 return false;
740 };
741 let mut prepare_ok = false;
742 for epoch in self.verification_epochs() {
743 let bytes = view_protocol::prepare_signing_bytes(
744 &self.state.chain_id_hash,
745 epoch,
746 certificate,
747 );
748 if self.verifier.verify(&leader.public_key, &bytes, signature) {
749 prepare_ok = true;
750 break;
751 }
752 }
753 if !prepare_ok {
754 warn!(view = %certificate.view, "invalid prepare signature");
755 return false;
756 }
757 let qc_bytes = Vote::signing_bytes(
759 &self.state.chain_id_hash,
760 certificate.epoch,
761 certificate.view,
762 &certificate.block_hash,
763 VoteType::Vote,
764 );
765 if !self
766 .verifier
767 .verify_aggregate(vs, &qc_bytes, &certificate.aggregate_signature)
768 {
769 warn!(view = %certificate.view, "invalid QC aggregate signature");
770 return false;
771 }
772 if certificate.epoch == self.state.current_epoch.number
773 && !hotmint_crypto::has_quorum(vs, &certificate.aggregate_signature)
774 {
775 warn!(view = %certificate.view, "Prepare QC below quorum threshold");
776 return false;
777 }
778 true
779 }
780 ConsensusMessage::Wish {
781 target_view,
782 validator,
783 highest_qc,
784 signature,
785 } => {
786 let Some(vi) = vs.get(*validator) else {
787 warn!(validator = %validator, "wish from unknown validator");
788 return false;
789 };
790 let mut wish_ok = false;
792 for epoch in self.verification_epochs() {
793 let bytes = crate::pacemaker::wish_signing_bytes(
794 &self.state.chain_id_hash,
795 epoch,
796 *target_view,
797 highest_qc.as_ref(),
798 );
799 if self.verifier.verify(&vi.public_key, &bytes, signature) {
800 wish_ok = true;
801 break;
802 }
803 }
804 if !wish_ok {
805 warn!(validator = %validator, "invalid wish signature");
806 return false;
807 }
808 true
809 }
810 ConsensusMessage::TimeoutCert(tc) => {
811 let target_view = ViewNumber(tc.view.as_u64() + 1);
817 let n = vs.validator_count();
818 if tc.aggregate_signature.signers.len() != n {
819 warn!(view = %tc.view, "TC signers bitfield length mismatch");
820 return false;
821 }
822 let mut sig_idx = 0usize;
823 let mut power = 0u64;
824 for (i, &signed) in tc.aggregate_signature.signers.iter().enumerate() {
825 if !signed {
826 continue;
827 }
828 let Some(vi) = vs.validators().get(i) else {
829 warn!(view = %tc.view, validator_idx = i, "TC signer index out of validator set");
830 return false;
831 };
832 let hqc = tc.highest_qcs.get(i).and_then(|h| h.as_ref());
833 if sig_idx >= tc.aggregate_signature.signatures.len() {
834 warn!(view = %tc.view, "TC aggregate_signature has fewer sigs than signers");
835 return false;
836 }
837 let mut tc_sig_ok = false;
838 for epoch in self.verification_epochs() {
839 let bytes = crate::pacemaker::wish_signing_bytes(
840 &self.state.chain_id_hash,
841 epoch,
842 target_view,
843 hqc,
844 );
845 if self.verifier.verify(
846 &vi.public_key,
847 &bytes,
848 &tc.aggregate_signature.signatures[sig_idx],
849 ) {
850 tc_sig_ok = true;
851 break;
852 }
853 }
854 if !tc_sig_ok {
855 warn!(view = %tc.view, validator = %vi.id, "TC signer signature invalid");
856 return false;
857 }
858 power += vs.power_of(vi.id);
859 sig_idx += 1;
860 }
861 if sig_idx != tc.aggregate_signature.signatures.len() {
862 warn!(view = %tc.view, "TC has extra signatures beyond bitfield");
863 return false;
864 }
865 if power < vs.quorum_threshold() {
866 warn!(view = %tc.view, power, threshold = vs.quorum_threshold(), "TC insufficient quorum");
867 return false;
868 }
869 true
870 }
871 ConsensusMessage::StatusCert {
872 locked_qc,
873 validator,
874 signature,
875 } => {
876 let Some(vi) = vs.get(*validator) else {
877 warn!(validator = %validator, "status from unknown validator");
878 return false;
879 };
880 let mut status_ok = false;
881 for epoch in self.verification_epochs() {
882 let bytes = view_protocol::status_signing_bytes(
883 &self.state.chain_id_hash,
884 epoch,
885 self.state.current_view,
886 locked_qc,
887 );
888 if self.verifier.verify(&vi.public_key, &bytes, signature) {
889 status_ok = true;
890 break;
891 }
892 }
893 if !status_ok {
894 warn!(validator = %validator, "invalid status signature");
895 return false;
896 }
897 true
898 }
899 ConsensusMessage::Evidence(_) => {
900 true
904 }
905 }
906 }
907
908 async fn handle_message(
909 &mut self,
910 sender: Option<ValidatorId>,
911 msg: ConsensusMessage,
912 ) -> Result<()> {
913 const MAX_MSG_PER_SEC: u32 = 100;
916 if let Some(vid) = sender {
917 let now = std::time::Instant::now();
918 let entry = self.msg_rate_limiter.entry(vid).or_insert((now, 0));
919 if now.duration_since(entry.0).as_secs() >= 1 {
920 *entry = (now, 1);
921 } else {
922 entry.1 += 1;
923 if entry.1 > MAX_MSG_PER_SEC {
924 return Ok(());
925 }
926 }
927 }
928
929 let verified = tokio::task::block_in_place(|| self.verify_message(&msg));
934 if !verified {
935 return Ok(());
936 }
937
938 match msg {
939 ConsensusMessage::Propose {
940 block,
941 justify,
942 double_cert,
943 signature: _,
944 } => {
945 let block = *block;
946 let justify = *justify;
947 let double_cert = double_cert.map(|dc| *dc);
948
949 if block.view > self.state.current_view {
951 if let Some(ref dc) = double_cert {
952 if !tokio::task::block_in_place(|| self.validate_double_cert(dc)) {
953 return Ok(());
954 }
955
956 self.apply_commit(dc, "fast-forward").await;
958 self.state.highest_double_cert = Some(dc.clone());
959 self.advance_view_to(block.view, ViewEntryTrigger::DoubleCert(dc.clone()))
960 .await;
961 } else {
962 return Ok(());
963 }
964 } else if block.view < self.state.current_view {
965 if block.height > self.state.last_committed_height {
971 let expected = hotmint_crypto::compute_block_hash(&block);
973 if block.hash == expected {
974 let mut store = self.store.write();
975 store.put_block(block);
976 }
977 }
978 return Ok(());
979 }
980
981 let mut store = self.store.write();
982
983 if let Some(ref dc) = double_cert
988 && !tokio::task::block_in_place(|| self.validate_double_cert(dc))
989 {
990 return Ok(());
991 }
992
993 if justify.aggregate_signature.count() > 0
999 && let Some(justified_block) = store.get_block(&justify.block_hash)
1000 && store.get_commit_qc(justified_block.height).is_none()
1001 {
1002 store.put_commit_qc(justified_block.height, justify.clone());
1003 }
1004
1005 if let Some(ref dc) = double_cert
1007 && let Some(ref mut wal) = self.wal
1008 && let Some(target_block) = store.get_block(&dc.inner_qc.block_hash)
1009 && let Err(e) = wal.log_commit_intent(target_block.height)
1010 {
1011 warn!(error = %e, "WAL: failed to log commit intent for fast-forward");
1012 }
1013
1014 let proposal_result = view_protocol::on_proposal(
1015 &mut self.state,
1016 view_protocol::ProposalData {
1017 block,
1018 justify,
1019 double_cert,
1020 },
1021 store.as_mut(),
1022 self.network.as_ref(),
1023 self.app.as_ref(),
1024 self.signer.as_ref(),
1025 )
1026 .c(d!())?;
1027 drop(store);
1028
1029 if let Some(result) = proposal_result.commit_result {
1032 self.process_commit_result(&result);
1033 }
1034 if let Some(epoch) = proposal_result.pending_epoch {
1035 self.pending_epoch = Some(epoch);
1036 }
1037 }
1038
1039 ConsensusMessage::VoteMsg(vote) => {
1040 if vote.view != self.state.current_view {
1041 return Ok(());
1042 }
1043 if !self.state.is_leader() {
1044 return Ok(());
1045 }
1046 if vote.vote_type != VoteType::Vote {
1047 return Ok(());
1048 }
1049
1050 let result = self
1051 .vote_collector
1052 .add_vote(
1053 &self.state.validator_set,
1054 vote,
1055 self.state.current_epoch.number,
1056 )
1057 .c(d!())?;
1058 self.handle_equivocation(&result);
1059 if let Some(qc) = result.qc {
1060 self.on_qc_formed(qc).await;
1061 }
1062 }
1063
1064 ConsensusMessage::Prepare {
1065 certificate,
1066 signature: _,
1067 } => {
1068 if certificate.view < self.state.current_view {
1069 return Ok(());
1070 }
1071 if certificate.view == self.state.current_view {
1072 let store = self.store.read();
1077 let block_opt = store.get_block(&certificate.block_hash);
1078 if self.app.tracks_app_hash()
1079 && let Some(ref block) = block_opt
1080 && block.app_hash != self.state.last_app_hash
1081 {
1082 warn!(
1083 block_app_hash = %block.app_hash,
1084 local_app_hash = %self.state.last_app_hash,
1085 "prepare block app_hash mismatch, ignoring"
1086 );
1087 return Ok(());
1088 }
1089
1090 let vote_extension = block_opt.and_then(|block| {
1093 let ctx = BlockContext {
1094 height: block.height,
1095 view: self.state.current_view,
1096 proposer: block.proposer,
1097 epoch: self.state.current_epoch.number,
1098 epoch_start_view: self.state.current_epoch.start_view,
1099 validator_set: &self.state.validator_set,
1100 timestamp: block.timestamp,
1101 vote_extensions: vec![],
1102 };
1103 self.app.extend_vote(&block, &ctx)
1104 });
1105 drop(store);
1106
1107 view_protocol::on_prepare(
1108 &mut self.state,
1109 certificate,
1110 self.network.as_ref(),
1111 self.signer.as_ref(),
1112 vote_extension,
1113 );
1114 }
1115 }
1116
1117 ConsensusMessage::Vote2Msg(vote) => {
1118 if vote.view != self.state.current_view {
1119 return Ok(());
1120 }
1121 if vote.vote_type != VoteType::Vote2 {
1122 return Ok(());
1123 }
1124
1125 if let Some(ref ext) = vote.extension
1127 && !self
1128 .app
1129 .verify_vote_extension(ext, &vote.block_hash, vote.validator)
1130 {
1131 warn!(
1132 validator = %vote.validator,
1133 view = %vote.view,
1134 "rejecting vote2: invalid vote extension"
1135 );
1136 return Ok(());
1137 }
1138
1139 let result = self
1140 .vote_collector
1141 .add_vote(
1142 &self.state.validator_set,
1143 vote,
1144 self.state.current_epoch.number,
1145 )
1146 .c(d!())?;
1147 self.handle_equivocation(&result);
1148 if let Some(outer_qc) = result.qc {
1149 self.on_double_cert_formed(outer_qc, result.extensions)
1150 .await;
1151 }
1152 }
1153
1154 ConsensusMessage::Wish {
1155 target_view,
1156 validator,
1157 highest_qc,
1158 signature,
1159 } => {
1160 if let Some(ref qc) = highest_qc
1163 && qc.aggregate_signature.count() > 0
1164 {
1165 let qc_bytes = Vote::signing_bytes(
1166 &self.state.chain_id_hash,
1167 qc.epoch,
1168 qc.view,
1169 &qc.block_hash,
1170 VoteType::Vote,
1171 );
1172 if !tokio::task::block_in_place(|| {
1173 self.verifier.verify_aggregate(
1174 &self.state.validator_set,
1175 &qc_bytes,
1176 &qc.aggregate_signature,
1177 )
1178 }) {
1179 warn!(validator = %validator, "wish carries invalid highest_qc signature");
1180 return Ok(());
1181 }
1182 if qc.epoch == self.state.current_epoch.number
1188 && !hotmint_crypto::has_quorum(
1189 &self.state.validator_set,
1190 &qc.aggregate_signature,
1191 )
1192 {
1193 warn!(validator = %validator, "wish carries highest_qc without quorum");
1194 return Ok(());
1195 }
1196 }
1197
1198 if let Some(tc) = self.pacemaker.add_wish(
1199 &self.state.validator_set,
1200 target_view,
1201 validator,
1202 highest_qc,
1203 signature,
1204 ) {
1205 info!(
1206 validator = %self.state.validator_id,
1207 view = %tc.view,
1208 "TC formed, advancing view"
1209 );
1210 self.network
1211 .broadcast(ConsensusMessage::TimeoutCert(tc.clone()));
1212 self.advance_view(ViewEntryTrigger::TimeoutCert(tc)).await;
1213 }
1214 }
1215
1216 ConsensusMessage::TimeoutCert(tc) => {
1217 if self.pacemaker.should_relay_tc(&tc) {
1218 self.network
1219 .broadcast(ConsensusMessage::TimeoutCert(tc.clone()));
1220 }
1221 let new_view = ViewNumber(tc.view.as_u64() + 1);
1222 if new_view > self.state.current_view {
1223 self.advance_view(ViewEntryTrigger::TimeoutCert(tc)).await;
1224 }
1225 }
1226
1227 ConsensusMessage::StatusCert {
1228 locked_qc,
1229 validator,
1230 signature: _,
1231 } => {
1232 if self.state.is_leader() && self.state.step == ViewStep::WaitingForStatus {
1233 if let Some(ref qc) = locked_qc {
1234 self.state.update_highest_qc(qc);
1235 }
1236 self.status_senders.insert(validator);
1237 let status_power: u64 = self
1238 .status_senders
1239 .iter()
1240 .map(|v| self.state.validator_set.power_of(*v))
1241 .sum();
1242 let total_power =
1244 status_power + self.state.validator_set.power_of(self.state.validator_id);
1245 if total_power >= self.state.validator_set.quorum_threshold() {
1246 self.try_propose().await;
1247 }
1248 }
1249 }
1250
1251 ConsensusMessage::Evidence(proof) => {
1252 let vs = &self.state.validator_set;
1256 let vi = match vs.get(proof.validator) {
1257 Some(vi) => vi,
1258 None => {
1259 warn!(validator = %proof.validator, "evidence for unknown validator");
1260 return Ok(());
1261 }
1262 };
1263 if proof.block_hash_a == proof.block_hash_b {
1264 warn!(validator = %proof.validator, "evidence has identical block hashes");
1265 return Ok(());
1266 }
1267 let bytes_a = Vote::signing_bytes(
1270 &self.state.chain_id_hash,
1271 proof.epoch,
1272 proof.view,
1273 &proof.block_hash_a,
1274 proof.vote_type,
1275 );
1276 let bytes_b = Vote::signing_bytes(
1277 &self.state.chain_id_hash,
1278 proof.epoch,
1279 proof.view,
1280 &proof.block_hash_b,
1281 proof.vote_type,
1282 );
1283 if !self
1284 .verifier
1285 .verify(&vi.public_key, &bytes_a, &proof.signature_a)
1286 || !self
1287 .verifier
1288 .verify(&vi.public_key, &bytes_b, &proof.signature_b)
1289 {
1290 warn!(validator = %proof.validator, "evidence has invalid signatures");
1291 return Ok(());
1292 }
1293
1294 info!(
1295 validator = %proof.validator,
1296 view = %proof.view,
1297 "received valid evidence gossip"
1298 );
1299 if let Err(e) = self.app.on_evidence(&proof) {
1300 warn!(error = %e, "on_evidence callback failed for gossiped proof");
1301 }
1302 if let Some(ref mut store) = self.evidence_store {
1303 store.put_evidence(proof);
1304 }
1305 }
1306 }
1307 Ok(())
1308 }
1309
1310 fn handle_equivocation(&mut self, result: &crate::vote_collector::VoteResult) {
1311 if let Some(ref proof) = result.equivocation {
1312 warn!(
1313 validator = %proof.validator,
1314 view = %proof.view,
1315 "equivocation detected!"
1316 );
1317 if let Err(e) = self.app.on_evidence(proof) {
1318 warn!(error = %e, "on_evidence callback failed");
1319 }
1320 self.network.broadcast_evidence(proof);
1321 if let Some(ref mut store) = self.evidence_store {
1322 store.put_evidence(proof.clone());
1323 }
1324 }
1325 }
1326
1327 async fn on_qc_formed(&mut self, qc: QuorumCertificate) {
1328 self.current_view_qc = Some(qc.clone());
1330
1331 view_protocol::on_votes_collected(
1332 &mut self.state,
1333 qc.clone(),
1334 self.network.as_ref(),
1335 self.signer.as_ref(),
1336 );
1337
1338 let vote_extension = {
1341 let store = self.store.read();
1342 store.get_block(&qc.block_hash).and_then(|block| {
1343 let ctx = BlockContext {
1344 height: block.height,
1345 view: self.state.current_view,
1346 proposer: block.proposer,
1347 epoch: self.state.current_epoch.number,
1348 epoch_start_view: self.state.current_epoch.start_view,
1349 validator_set: &self.state.validator_set,
1350 timestamp: block.timestamp,
1351 vote_extensions: vec![],
1352 };
1353 self.app.extend_vote(&block, &ctx)
1354 })
1355 };
1356 let vote_bytes = Vote::signing_bytes(
1357 &self.state.chain_id_hash,
1358 self.state.current_epoch.number,
1359 self.state.current_view,
1360 &qc.block_hash,
1361 VoteType::Vote2,
1362 );
1363 let signature = self.signer.sign(&vote_bytes);
1364 let vote = Vote {
1365 block_hash: qc.block_hash,
1366 view: self.state.current_view,
1367 validator: self.state.validator_id,
1368 signature,
1369 vote_type: VoteType::Vote2,
1370 extension: vote_extension,
1371 };
1372
1373 self.state.update_locked_qc(&qc);
1375
1376 let next_leader_id =
1377 leader::next_leader(&self.state.validator_set, self.state.current_view);
1378 if next_leader_id == self.state.validator_id {
1379 match self.vote_collector.add_vote(
1381 &self.state.validator_set,
1382 vote,
1383 self.state.current_epoch.number,
1384 ) {
1385 Ok(result) => {
1386 self.handle_equivocation(&result);
1387 if let Some(outer_qc) = result.qc {
1388 self.on_double_cert_formed(outer_qc, result.extensions)
1389 .await;
1390 }
1391 }
1392 Err(e) => warn!(error = %e, "failed to add self vote2"),
1393 }
1394 } else {
1395 self.network
1396 .send_to(next_leader_id, ConsensusMessage::Vote2Msg(vote));
1397 }
1398 }
1399
1400 async fn on_double_cert_formed(
1401 &mut self,
1402 outer_qc: QuorumCertificate,
1403 extensions: Vec<(ValidatorId, Vec<u8>)>,
1404 ) {
1405 let inner_qc = match self.current_view_qc.take() {
1407 Some(qc) if qc.block_hash == outer_qc.block_hash => qc,
1408 _ => {
1409 match &self.state.locked_qc {
1411 Some(qc) if qc.block_hash == outer_qc.block_hash => qc.clone(),
1412 _ => match &self.state.highest_qc {
1413 Some(qc) if qc.block_hash == outer_qc.block_hash => qc.clone(),
1414 _ => {
1415 warn!(
1416 validator = %self.state.validator_id,
1417 "double cert formed but can't find matching inner QC"
1418 );
1419 return;
1420 }
1421 },
1422 }
1423 }
1424 };
1425
1426 let dc = DoubleCertificate {
1427 inner_qc,
1428 outer_qc,
1429 vote_extensions: extensions,
1430 };
1431
1432 info!(
1433 validator = %self.state.validator_id,
1434 view = %self.state.current_view,
1435 hash = %dc.inner_qc.block_hash,
1436 "double certificate formed, committing"
1437 );
1438
1439 self.apply_commit(&dc, "double-cert").await;
1441
1442 self.state.highest_double_cert = Some(dc.clone());
1443
1444 self.advance_view(ViewEntryTrigger::DoubleCert(dc)).await;
1446 }
1447
1448 async fn handle_timeout(&mut self) {
1449 let has_power = self.state.validator_set.power_of(self.state.validator_id) > 0;
1453 if !has_power {
1454 self.pacemaker.on_timeout();
1455 return;
1456 }
1457
1458 info!(
1459 validator = %self.state.validator_id,
1460 view = %self.state.current_view,
1461 "view timeout, sending wish"
1462 );
1463
1464 let wish = self.pacemaker.build_wish(
1465 &self.state.chain_id_hash,
1466 self.state.current_epoch.number,
1467 self.state.current_view,
1468 self.state.validator_id,
1469 self.state.highest_qc.clone(),
1470 self.signer.as_ref(),
1471 );
1472
1473 self.network.broadcast(wish.clone());
1474
1475 if let ConsensusMessage::Wish {
1477 target_view,
1478 validator,
1479 highest_qc,
1480 signature,
1481 } = wish
1482 && let Some(tc) = self.pacemaker.add_wish(
1483 &self.state.validator_set,
1484 target_view,
1485 validator,
1486 highest_qc,
1487 signature,
1488 )
1489 {
1490 self.network
1491 .broadcast(ConsensusMessage::TimeoutCert(tc.clone()));
1492 self.advance_view(ViewEntryTrigger::TimeoutCert(tc)).await;
1493 return;
1494 }
1495
1496 self.pacemaker.on_timeout();
1498 }
1499
1500 fn process_commit_result(&mut self, result: &CommitResult) {
1505 if result.committed_blocks.is_empty() {
1506 return;
1507 }
1508 {
1509 let mut s = self.store.write();
1510 for (i, block) in result.committed_blocks.iter().enumerate() {
1511 if result.commit_qc.block_hash == block.hash {
1512 s.put_commit_qc(block.height, result.commit_qc.clone());
1513 }
1514 let txs = crate::commit::decode_payload(&block.payload);
1515 for (tx_idx, tx) in txs.iter().enumerate() {
1516 let tx_hash = *blake3::hash(tx).as_bytes();
1517 s.put_tx_index(tx_hash, block.height, tx_idx as u32);
1518 }
1519 if let Some(resp) = result.block_responses.get(i) {
1520 s.put_block_results(block.height, resp.clone());
1521 }
1522 }
1523 s.flush();
1524 }
1525 if let Some(ref mut ev_store) = self.evidence_store {
1526 for block in &result.committed_blocks {
1527 for proof in &block.evidence {
1528 ev_store.mark_committed(proof.view, proof.validator);
1529 }
1530 for proof in ev_store.get_pending() {
1531 if proof.view <= block.view {
1532 ev_store.mark_committed(proof.view, proof.validator);
1533 }
1534 }
1535 }
1536 }
1537 self.liveness_tracker.record_commit(
1538 &self.state.validator_set,
1539 &result.commit_qc.aggregate_signature.signers,
1540 );
1541 self.persist_state();
1542 if let Some(ref mut wal) = self.wal
1543 && let Err(e) = wal.log_commit_done(self.state.last_committed_height)
1544 {
1545 warn!(error = %e, "WAL: failed to log commit done");
1546 }
1547 }
1548
1549 async fn apply_commit(&mut self, dc: &DoubleCertificate, context: &str) {
1552 if let Some(ref mut wal) = self.wal {
1554 let target_height = {
1555 let store = self.store.read();
1556 store.get_block(&dc.inner_qc.block_hash).map(|b| b.height)
1557 };
1558 if let Some(h) = target_height
1559 && let Err(e) = wal.log_commit_intent(h)
1560 {
1561 warn!(error = %e, "WAL: failed to log commit intent");
1562 }
1563 }
1564
1565 let store = self.store.read();
1566 match try_commit(
1567 dc,
1568 store.as_ref(),
1569 self.app.as_ref(),
1570 &mut self.state.last_committed_height,
1571 &self.state.current_epoch,
1572 ) {
1573 Ok(result) => {
1574 if !result.committed_blocks.is_empty() {
1575 self.state.last_app_hash = result.last_app_hash;
1576 }
1577 if result.pending_epoch.is_some() {
1578 self.pending_epoch = result.pending_epoch.clone();
1579 }
1580 drop(store);
1581 self.process_commit_result(&result);
1582 }
1583 Err(e) => {
1584 warn!(error = %e, "try_commit failed during {context}");
1585 drop(store);
1586 }
1587 }
1588 }
1589
1590 fn validate_double_cert(&self, dc: &DoubleCertificate) -> bool {
1602 if dc.inner_qc.block_hash != dc.outer_qc.block_hash {
1603 warn!("double cert inner/outer block_hash mismatch");
1604 return false;
1605 }
1606 let vs = &self.state.validator_set;
1607 let inner_bytes = Vote::signing_bytes(
1608 &self.state.chain_id_hash,
1609 dc.inner_qc.epoch,
1610 dc.inner_qc.view,
1611 &dc.inner_qc.block_hash,
1612 VoteType::Vote,
1613 );
1614 if !self
1615 .verifier
1616 .verify_aggregate(vs, &inner_bytes, &dc.inner_qc.aggregate_signature)
1617 {
1618 warn!("double cert inner QC signature invalid");
1619 return false;
1620 }
1621 if !hotmint_crypto::has_quorum(vs, &dc.inner_qc.aggregate_signature) {
1622 warn!("double cert inner QC below quorum threshold");
1623 return false;
1624 }
1625 let outer_bytes = Vote::signing_bytes(
1626 &self.state.chain_id_hash,
1627 dc.outer_qc.epoch,
1628 dc.outer_qc.view,
1629 &dc.outer_qc.block_hash,
1630 VoteType::Vote2,
1631 );
1632 if !self
1633 .verifier
1634 .verify_aggregate(vs, &outer_bytes, &dc.outer_qc.aggregate_signature)
1635 {
1636 warn!("double cert outer QC signature invalid");
1637 return false;
1638 }
1639 if !hotmint_crypto::has_quorum(vs, &dc.outer_qc.aggregate_signature) {
1640 warn!("double cert outer QC below quorum threshold");
1641 return false;
1642 }
1643 true
1644 }
1645
1646 fn persist_state(&mut self) {
1647 if let Some(p) = self.persistence.as_mut() {
1648 p.save_current_view(self.state.current_view);
1649 if let Some(ref qc) = self.state.locked_qc {
1650 p.save_locked_qc(qc);
1651 }
1652 if let Some(ref qc) = self.state.highest_qc {
1653 p.save_highest_qc(qc);
1654 }
1655 p.save_last_committed_height(self.state.last_committed_height);
1656 p.save_current_epoch(&self.state.current_epoch);
1657 p.save_last_app_hash(self.state.last_app_hash);
1658 p.save_pending_epoch(self.pending_epoch.as_ref());
1659 p.flush();
1660 }
1661 }
1662
1663 async fn advance_view(&mut self, trigger: ViewEntryTrigger) {
1664 let new_view = match &trigger {
1665 ViewEntryTrigger::DoubleCert(_) => self.state.current_view.next(),
1666 ViewEntryTrigger::TimeoutCert(tc) => ViewNumber(tc.view.as_u64() + 1),
1667 ViewEntryTrigger::Genesis => ViewNumber(1),
1668 };
1669 self.advance_view_to(new_view, trigger).await;
1670 }
1671
1672 async fn advance_view_to(&mut self, new_view: ViewNumber, trigger: ViewEntryTrigger) {
1673 if new_view <= self.state.current_view {
1674 return;
1675 }
1676
1677 let is_progress = matches!(&trigger, ViewEntryTrigger::DoubleCert(_));
1679
1680 if let ViewEntryTrigger::DoubleCert(ref dc) = trigger {
1682 self.state.pending_vote_extensions = dc.vote_extensions.clone();
1683 } else {
1684 self.state.pending_vote_extensions.clear();
1685 }
1686
1687 self.vote_collector.clear_view(self.state.current_view);
1688 self.vote_collector.prune_before(self.state.current_view);
1689 self.pacemaker.clear_view(self.state.current_view);
1690 self.pacemaker.prune_before(self.state.current_view);
1691 self.status_senders.clear();
1692 self.current_view_qc = None;
1693
1694 if self
1698 .pending_epoch
1699 .as_ref()
1700 .is_some_and(|e| new_view >= e.start_view)
1701 {
1702 let Some(new_epoch) = self.pending_epoch.take() else {
1704 unreachable!("pending_epoch was Some in the condition check");
1705 };
1706 info!(
1707 validator = %self.state.validator_id,
1708 old_epoch = %self.state.current_epoch.number,
1709 new_epoch = %new_epoch.number,
1710 start_view = %new_epoch.start_view,
1711 validators = new_epoch.validator_set.validator_count(),
1712 "epoch transition"
1713 );
1714 let offline = self.liveness_tracker.offline_validators();
1716 if !offline.is_empty() {
1717 let evidence: Vec<OfflineEvidence> = offline
1718 .iter()
1719 .map(|&(validator, missed, total)| OfflineEvidence {
1720 validator,
1721 missed_commits: missed,
1722 total_commits: total,
1723 evidence_height: self.state.last_committed_height,
1724 })
1725 .collect();
1726 info!(
1727 offline_count = evidence.len(),
1728 epoch = %self.state.current_epoch.number,
1729 "reporting offline validators"
1730 );
1731 if let Err(e) = self.app.on_offline_validators(&evidence) {
1732 warn!(error = %e, "on_offline_validators callback failed");
1733 }
1734 }
1735 self.liveness_tracker.reset();
1736
1737 self.state.validator_set = new_epoch.validator_set.clone();
1738 self.state.current_epoch = new_epoch;
1739 self.network
1741 .on_epoch_change(self.state.current_epoch.number, &self.state.validator_set);
1742 self.vote_collector = VoteCollector::new();
1744 self.pacemaker = Pacemaker::with_config(self.pacemaker_config.clone());
1745 }
1746
1747 view_protocol::enter_view(
1748 &mut self.state,
1749 new_view,
1750 trigger,
1751 self.network.as_ref(),
1752 self.signer.as_ref(),
1753 );
1754
1755 if is_progress {
1756 self.pacemaker.reset_on_progress();
1757 } else {
1758 self.pacemaker.reset_timer();
1759 }
1760
1761 self.persist_state();
1762
1763 if self.state.is_leader() && self.state.step == ViewStep::WaitingForStatus {
1769 self.try_propose().await;
1770 }
1771 }
1772}
1773
1774#[cfg(test)]
1779mod tests {
1780 use super::*;
1781
1782 use std::sync::Arc;
1783
1784 use parking_lot::RwLock;
1785
1786 use hotmint_crypto::{Ed25519Signer, Ed25519Verifier};
1787 use hotmint_types::Signer as SignerTrait;
1788 use hotmint_types::certificate::QuorumCertificate;
1789 use hotmint_types::crypto::AggregateSignature;
1790 use hotmint_types::epoch::EpochNumber;
1791 use hotmint_types::validator::{ValidatorId, ValidatorInfo};
1792 use hotmint_types::vote::{Vote, VoteType};
1793 use tokio::sync::mpsc;
1794
1795 use crate::application::NoopApplication;
1796 use crate::network::NetworkSink;
1797 use crate::state::ConsensusState;
1798 use crate::store::MemoryBlockStore;
1799
1800 struct DevNullNetwork;
1802 impl NetworkSink for DevNullNetwork {
1803 fn broadcast(&self, _: ConsensusMessage) {}
1804 fn send_to(&self, _: ValidatorId, _: ConsensusMessage) {}
1805 }
1806
1807 fn test_chain_id_hash() -> [u8; 32] {
1809 *blake3::hash(b"").as_bytes()
1810 }
1811
1812 fn make_validator_set_4() -> (ValidatorSet, Vec<Ed25519Signer>) {
1813 let signers: Vec<Ed25519Signer> = (0..4)
1814 .map(|i| Ed25519Signer::generate(ValidatorId(i)))
1815 .collect();
1816 let infos: Vec<ValidatorInfo> = signers
1817 .iter()
1818 .map(|s| ValidatorInfo {
1819 id: s.validator_id(),
1820 public_key: s.public_key(),
1821 power: 1,
1822 })
1823 .collect();
1824 (ValidatorSet::new(infos), signers)
1825 }
1826
1827 fn make_test_engine(
1828 vid: ValidatorId,
1829 vs: ValidatorSet,
1830 signer: Ed25519Signer,
1831 ) -> (
1832 ConsensusEngine,
1833 mpsc::Sender<(Option<ValidatorId>, ConsensusMessage)>,
1834 ) {
1835 let (tx, rx) = mpsc::channel(64);
1836 let store = Arc::new(RwLock::new(
1837 Box::new(MemoryBlockStore::new()) as Box<dyn crate::store::BlockStore>
1838 ));
1839 let state = ConsensusState::new(vid, vs);
1840 let engine = ConsensusEngine::new(
1841 state,
1842 store,
1843 Box::new(DevNullNetwork),
1844 Box::new(NoopApplication),
1845 Box::new(signer),
1846 rx,
1847 EngineConfig {
1848 verifier: Box::new(Ed25519Verifier),
1849 pacemaker: None,
1850 persistence: None,
1851 evidence_store: None,
1852 wal: None,
1853 pending_epoch: None,
1854 },
1855 );
1856 (engine, tx)
1857 }
1858
1859 #[test]
1862 fn r29_propose_sub_quorum_justify_rejected_by_verify_message() {
1863 let (vs, signers) = make_validator_set_4();
1864 let engine_signer = Ed25519Signer::generate(ValidatorId(0));
1867 let (engine, _tx) = make_test_engine(ValidatorId(0), vs.clone(), engine_signer);
1868
1869 let chain_id_hash = test_chain_id_hash();
1871 let hash = BlockHash::GENESIS;
1872 let qc_view = ViewNumber::GENESIS;
1873 let vote_bytes = Vote::signing_bytes(
1874 &chain_id_hash,
1875 EpochNumber(0),
1876 qc_view,
1877 &hash,
1878 VoteType::Vote,
1879 );
1880 let mut agg = AggregateSignature::new(4);
1881 agg.add(1, SignerTrait::sign(&signers[1], &vote_bytes))
1882 .unwrap();
1883 let sub_quorum_qc = QuorumCertificate {
1884 block_hash: hash,
1885 view: qc_view,
1886 aggregate_signature: agg,
1887 epoch: EpochNumber(0),
1888 };
1889
1890 let mut block = Block::genesis();
1892 block.height = Height(1);
1893 block.view = ViewNumber(1);
1894 block.proposer = ValidatorId(1);
1895 block.hash = block.compute_hash();
1896 let proposal_bytes = crate::view_protocol::proposal_signing_bytes(
1897 &chain_id_hash,
1898 EpochNumber(0),
1899 &block,
1900 &sub_quorum_qc,
1901 );
1902 let signature = SignerTrait::sign(&signers[1], &proposal_bytes);
1903
1904 let msg = ConsensusMessage::Propose {
1905 block: Box::new(block),
1906 justify: Box::new(sub_quorum_qc),
1907 double_cert: None,
1908 signature,
1909 };
1910
1911 assert!(
1912 !engine.verify_message(&msg),
1913 "R-29 regression: Propose with sub-quorum justify QC must be rejected by verify_message"
1914 );
1915 }
1916
1917 #[test]
1919 fn r29_propose_full_quorum_justify_accepted_by_verify_message() {
1920 let (vs, signers) = make_validator_set_4();
1921 let engine_signer = Ed25519Signer::generate(ValidatorId(0));
1922 let (engine, _tx) = make_test_engine(ValidatorId(0), vs.clone(), engine_signer);
1923
1924 let chain_id_hash = test_chain_id_hash();
1925 let hash = BlockHash::GENESIS;
1926 let qc_view = ViewNumber::GENESIS;
1927 let vote_bytes = Vote::signing_bytes(
1928 &chain_id_hash,
1929 EpochNumber(0),
1930 qc_view,
1931 &hash,
1932 VoteType::Vote,
1933 );
1934 let mut agg = AggregateSignature::new(4);
1936 for (i, signer) in signers.iter().take(3).enumerate() {
1937 agg.add(i, SignerTrait::sign(signer, &vote_bytes)).unwrap();
1938 }
1939 let full_quorum_qc = QuorumCertificate {
1940 block_hash: hash,
1941 view: qc_view,
1942 aggregate_signature: agg,
1943 epoch: EpochNumber(0),
1944 };
1945
1946 let mut block = Block::genesis();
1947 block.height = Height(1);
1948 block.view = ViewNumber(1);
1949 block.proposer = ValidatorId(1);
1950 block.hash = block.compute_hash();
1951 let proposal_bytes = crate::view_protocol::proposal_signing_bytes(
1952 &chain_id_hash,
1953 EpochNumber(0),
1954 &block,
1955 &full_quorum_qc,
1956 );
1957 let signature = SignerTrait::sign(&signers[1], &proposal_bytes);
1958
1959 let msg = ConsensusMessage::Propose {
1960 block: Box::new(block),
1961 justify: Box::new(full_quorum_qc),
1962 double_cert: None,
1963 signature,
1964 };
1965
1966 assert!(
1967 engine.verify_message(&msg),
1968 "R-29: Propose with full quorum justify QC must pass verify_message"
1969 );
1970 }
1971
1972 #[test]
1980 fn r32_sub_quorum_highest_qc_fails_has_quorum() {
1981 let (vs, signers) = make_validator_set_4();
1982
1983 let chain_id_hash = test_chain_id_hash();
1984 let hash = BlockHash([1u8; 32]);
1985 let qc_view = ViewNumber(1);
1986 let vote_bytes = Vote::signing_bytes(
1987 &chain_id_hash,
1988 EpochNumber(0),
1989 qc_view,
1990 &hash,
1991 VoteType::Vote,
1992 );
1993
1994 let mut agg = AggregateSignature::new(4);
1996 agg.add(0, SignerTrait::sign(&signers[0], &vote_bytes))
1997 .unwrap();
1998
1999 assert!(
2000 !hotmint_crypto::has_quorum(&vs, &agg),
2001 "R-32 regression: 1-of-4 signed QC must not satisfy has_quorum"
2002 );
2003
2004 let mut agg_full = AggregateSignature::new(4);
2006 for (i, signer) in signers.iter().take(3).enumerate() {
2007 agg_full
2008 .add(i, SignerTrait::sign(signer, &vote_bytes))
2009 .unwrap();
2010 }
2011 assert!(
2012 hotmint_crypto::has_quorum(&vs, &agg_full),
2013 "R-32: 3-of-4 signed QC must satisfy has_quorum"
2014 );
2015 }
2016}