1use ruc::*;
2
3use std::collections::{HashMap, HashSet};
4use std::sync::Arc;
5
6use tokio::sync::RwLock;
7
8use crate::application::Application;
9use crate::commit::try_commit;
10use crate::leader;
11use crate::network::NetworkSink;
12use crate::pacemaker::{Pacemaker, PacemakerConfig};
13use crate::state::{ConsensusState, ViewStep};
14use crate::store::BlockStore;
15use crate::view_protocol::{self, ViewEntryTrigger};
16use crate::vote_collector::VoteCollector;
17
18use hotmint_types::epoch::Epoch;
19use hotmint_types::vote::VoteType;
20use hotmint_types::*;
21use tokio::sync::mpsc;
22use tracing::{info, warn};
23
24pub type SharedBlockStore = Arc<RwLock<Box<dyn BlockStore>>>;
26
27pub trait StatePersistence: Send {
29 fn save_current_view(&mut self, view: ViewNumber);
30 fn save_locked_qc(&mut self, qc: &QuorumCertificate);
31 fn save_highest_qc(&mut self, qc: &QuorumCertificate);
32 fn save_last_committed_height(&mut self, height: Height);
33 fn save_current_epoch(&mut self, epoch: &Epoch);
34 fn save_last_app_hash(&mut self, hash: BlockHash);
35 fn flush(&self);
36}
37
38pub struct ConsensusEngine {
39 state: ConsensusState,
40 store: SharedBlockStore,
41 network: Box<dyn NetworkSink>,
42 app: Box<dyn Application>,
43 signer: Box<dyn Signer>,
44 verifier: Box<dyn Verifier>,
45 vote_collector: VoteCollector,
46 pacemaker: Pacemaker,
47 pacemaker_config: PacemakerConfig,
48 msg_rx: mpsc::Receiver<(Option<ValidatorId>, ConsensusMessage)>,
49 status_senders: HashSet<ValidatorId>,
51 current_view_qc: Option<QuorumCertificate>,
53 pending_epoch: Option<Epoch>,
55 persistence: Option<Box<dyn StatePersistence>>,
57}
58
59pub struct EngineConfig {
61 pub verifier: Box<dyn Verifier>,
62 pub pacemaker: Option<PacemakerConfig>,
63 pub persistence: Option<Box<dyn StatePersistence>>,
64}
65
66impl EngineConfig {
67 pub fn new(verifier: Box<dyn Verifier>) -> Self {
70 Self {
71 verifier,
72 pacemaker: None,
73 persistence: None,
74 }
75 }
76
77 pub fn with_pacemaker(mut self, pacemaker: PacemakerConfig) -> Self {
79 self.pacemaker = Some(pacemaker);
80 self
81 }
82
83 pub fn with_persistence(mut self, persistence: Box<dyn StatePersistence>) -> Self {
85 self.persistence = Some(persistence);
86 self
87 }
88}
89
90pub struct ConsensusEngineBuilder {
106 state: Option<ConsensusState>,
107 store: Option<SharedBlockStore>,
108 network: Option<Box<dyn NetworkSink>>,
109 app: Option<Box<dyn Application>>,
110 signer: Option<Box<dyn Signer>>,
111 msg_rx: Option<mpsc::Receiver<(Option<ValidatorId>, ConsensusMessage)>>,
112 verifier: Option<Box<dyn Verifier>>,
113 pacemaker: Option<PacemakerConfig>,
114 persistence: Option<Box<dyn StatePersistence>>,
115}
116
117impl ConsensusEngineBuilder {
118 pub fn new() -> Self {
119 Self {
120 state: None,
121 store: None,
122 network: None,
123 app: None,
124 signer: None,
125 msg_rx: None,
126 verifier: None,
127 pacemaker: None,
128 persistence: None,
129 }
130 }
131
132 pub fn state(mut self, state: ConsensusState) -> Self {
133 self.state = Some(state);
134 self
135 }
136
137 pub fn store(mut self, store: SharedBlockStore) -> Self {
138 self.store = Some(store);
139 self
140 }
141
142 pub fn network(mut self, network: Box<dyn NetworkSink>) -> Self {
143 self.network = Some(network);
144 self
145 }
146
147 pub fn app(mut self, app: Box<dyn Application>) -> Self {
148 self.app = Some(app);
149 self
150 }
151
152 pub fn signer(mut self, signer: Box<dyn Signer>) -> Self {
153 self.signer = Some(signer);
154 self
155 }
156
157 pub fn messages(
158 mut self,
159 msg_rx: mpsc::Receiver<(Option<ValidatorId>, ConsensusMessage)>,
160 ) -> Self {
161 self.msg_rx = Some(msg_rx);
162 self
163 }
164
165 pub fn verifier(mut self, verifier: Box<dyn Verifier>) -> Self {
166 self.verifier = Some(verifier);
167 self
168 }
169
170 pub fn pacemaker(mut self, config: PacemakerConfig) -> Self {
171 self.pacemaker = Some(config);
172 self
173 }
174
175 pub fn persistence(mut self, persistence: Box<dyn StatePersistence>) -> Self {
176 self.persistence = Some(persistence);
177 self
178 }
179
180 pub fn build(self) -> ruc::Result<ConsensusEngine> {
181 let state = self.state.ok_or_else(|| ruc::eg!("state is required"))?;
182 let store = self.store.ok_or_else(|| ruc::eg!("store is required"))?;
183 let network = self
184 .network
185 .ok_or_else(|| ruc::eg!("network is required"))?;
186 let app = self.app.ok_or_else(|| ruc::eg!("app is required"))?;
187 let signer = self.signer.ok_or_else(|| ruc::eg!("signer is required"))?;
188 let msg_rx = self
189 .msg_rx
190 .ok_or_else(|| ruc::eg!("messages (msg_rx) is required"))?;
191 let verifier = self
192 .verifier
193 .ok_or_else(|| ruc::eg!("verifier is required"))?;
194
195 let config = EngineConfig {
196 verifier,
197 pacemaker: self.pacemaker,
198 persistence: self.persistence,
199 };
200
201 Ok(ConsensusEngine::new(
202 state, store, network, app, signer, msg_rx, config,
203 ))
204 }
205}
206
207impl Default for ConsensusEngineBuilder {
208 fn default() -> Self {
209 Self::new()
210 }
211}
212
213impl ConsensusEngine {
214 pub fn new(
215 state: ConsensusState,
216 store: SharedBlockStore,
217 network: Box<dyn NetworkSink>,
218 app: Box<dyn Application>,
219 signer: Box<dyn Signer>,
220 msg_rx: mpsc::Receiver<(Option<ValidatorId>, ConsensusMessage)>,
221 config: EngineConfig,
222 ) -> Self {
223 let pc = config.pacemaker.unwrap_or_default();
224 Self {
225 state,
226 store,
227 network,
228 app,
229 signer,
230 verifier: config.verifier,
231 vote_collector: VoteCollector::new(),
232 pacemaker: Pacemaker::with_config(pc.clone()),
233 pacemaker_config: pc,
234 msg_rx,
235 status_senders: HashSet::new(),
236 current_view_qc: None,
237 pending_epoch: None,
238 persistence: config.persistence,
239 }
240 }
241
242 pub async fn run(mut self) {
245 if self.state.current_view.as_u64() <= 1 {
246 self.enter_genesis_view().await;
247 } else {
248 info!(
249 validator = %self.state.validator_id,
250 view = %self.state.current_view,
251 height = %self.state.last_committed_height,
252 "resuming from persisted state"
253 );
254 self.pacemaker.reset_timer();
255 }
256
257 loop {
258 let deadline = self.pacemaker.sleep_until_deadline();
259 tokio::pin!(deadline);
260
261 tokio::select! {
262 Some((sender, msg)) = self.msg_rx.recv() => {
263 if let Err(e) = self.handle_message(sender, msg).await {
264 warn!(validator = %self.state.validator_id, error = %e, "error handling message");
265 }
266 }
267 _ = &mut deadline => {
268 self.handle_timeout().await;
269 }
270 }
271 }
272 }
273
274 async fn enter_genesis_view(&mut self) {
275 let genesis_qc = QuorumCertificate {
277 block_hash: BlockHash::GENESIS,
278 view: ViewNumber::GENESIS,
279 aggregate_signature: AggregateSignature::new(
280 self.state.validator_set.validator_count(),
281 ),
282 };
283 self.state.highest_qc = Some(genesis_qc);
284
285 let view = ViewNumber(1);
286 view_protocol::enter_view(
287 &mut self.state,
288 view,
289 ViewEntryTrigger::Genesis,
290 self.network.as_ref(),
291 self.signer.as_ref(),
292 );
293 self.pacemaker.reset_timer();
294
295 if self.state.is_leader() {
297 self.state.step = ViewStep::WaitingForStatus;
298 self.try_propose().await;
300 }
301 }
302
303 fn try_propose(
304 &mut self,
305 ) -> std::pin::Pin<Box<dyn std::future::Future<Output = ()> + Send + '_>> {
306 Box::pin(async move {
307 let mut store = self.store.write().await;
308 match view_protocol::propose(
309 &mut self.state,
310 store.as_mut(),
311 self.network.as_ref(),
312 self.app.as_ref(),
313 self.signer.as_ref(),
314 ) {
315 Ok(block) => {
316 drop(store);
317 self.leader_self_vote(block.hash).await;
319 }
320 Err(e) => {
321 warn!(
322 validator = %self.state.validator_id,
323 error = %e,
324 "failed to propose"
325 );
326 }
327 }
328 })
329 }
330
331 async fn leader_self_vote(&mut self, block_hash: BlockHash) {
332 let vote_bytes = Vote::signing_bytes(
333 &self.state.chain_id_hash,
334 self.state.current_view,
335 &block_hash,
336 VoteType::Vote,
337 );
338 let signature = self.signer.sign(&vote_bytes);
339 let vote = Vote {
340 block_hash,
341 view: self.state.current_view,
342 validator: self.state.validator_id,
343 signature,
344 vote_type: VoteType::Vote,
345 };
346 match self
347 .vote_collector
348 .add_vote(&self.state.validator_set, vote)
349 {
350 Ok(result) => {
351 self.handle_equivocation(&result);
352 if let Some(qc) = result.qc {
353 self.on_qc_formed(qc).await;
354 }
355 }
356 Err(e) => warn!(error = %e, "failed to add self vote"),
357 }
358 }
359}
360
361pub fn verify_relay_sender(
378 sender: ValidatorId,
379 msg: &ConsensusMessage,
380 validator_keys: &HashMap<ValidatorId, hotmint_types::crypto::PublicKey>,
381 ordered_validators: &[ValidatorId],
382 chain_id_hash: &[u8; 32],
383) -> bool {
384 use hotmint_crypto::Ed25519Verifier;
385 use hotmint_types::Verifier;
386 use hotmint_types::vote::Vote;
387 let verifier = Ed25519Verifier;
388 match msg {
389 ConsensusMessage::Propose {
390 block,
391 justify,
392 signature,
393 ..
394 } => {
395 let Some(pk) = validator_keys.get(&block.proposer) else {
396 return false;
397 };
398 let bytes = crate::view_protocol::proposal_signing_bytes(chain_id_hash, block, justify);
399 Verifier::verify(&verifier, pk, &bytes, signature)
400 }
401 ConsensusMessage::VoteMsg(vote) | ConsensusMessage::Vote2Msg(vote) => {
402 let Some(pk) = validator_keys.get(&vote.validator) else {
403 return false;
404 };
405 let bytes =
406 Vote::signing_bytes(chain_id_hash, vote.view, &vote.block_hash, vote.vote_type);
407 Verifier::verify(&verifier, pk, &bytes, &vote.signature)
408 }
409 ConsensusMessage::Prepare {
410 certificate,
411 signature,
412 } => {
413 if !ordered_validators.is_empty() {
416 let n = ordered_validators.len();
417 let expected_leader = ordered_validators[certificate.view.as_u64() as usize % n];
418 if sender != expected_leader {
419 return false;
420 }
421 }
422 let Some(pk) = validator_keys.get(&sender) else {
423 return false;
424 };
425 let bytes = crate::view_protocol::prepare_signing_bytes(chain_id_hash, certificate);
426 Verifier::verify(&verifier, pk, &bytes, signature)
427 }
428 ConsensusMessage::Wish {
429 target_view,
430 validator,
431 highest_qc,
432 signature,
433 } => {
434 let Some(pk) = validator_keys.get(validator) else {
435 return false;
436 };
437 let bytes = crate::pacemaker::wish_signing_bytes(
438 chain_id_hash,
439 *target_view,
440 highest_qc.as_ref(),
441 );
442 Verifier::verify(&verifier, pk, &bytes, signature)
443 }
444 ConsensusMessage::TimeoutCert(tc) => {
445 let target_view = ViewNumber(tc.view.as_u64() + 1);
447 let n = ordered_validators.len();
448 if n == 0 || tc.aggregate_signature.signers.len() != n {
449 return false;
450 }
451 let mut sig_idx = 0usize;
452 let mut verified_count = 0usize;
453 for (i, &signed) in tc.aggregate_signature.signers.iter().enumerate() {
454 if !signed {
455 continue;
456 }
457 if i >= n {
458 return false;
459 }
460 let vid = ordered_validators[i];
461 let Some(pk) = validator_keys.get(&vid) else {
462 return false;
463 };
464 let hqc = tc.highest_qcs.get(i).and_then(|h| h.as_ref());
465 let bytes = crate::pacemaker::wish_signing_bytes(chain_id_hash, target_view, hqc);
466 if sig_idx >= tc.aggregate_signature.signatures.len() {
467 return false;
468 }
469 if !Verifier::verify(
470 &verifier,
471 pk,
472 &bytes,
473 &tc.aggregate_signature.signatures[sig_idx],
474 ) {
475 return false;
476 }
477 sig_idx += 1;
478 verified_count += 1;
479 }
480 if sig_idx != tc.aggregate_signature.signatures.len() {
481 return false;
482 }
483 verified_count * 3 > n * 2
486 }
487 ConsensusMessage::StatusCert {
488 validator,
489 signature,
490 locked_qc,
491 ..
492 } => {
493 let Some(pk) = validator_keys.get(validator) else {
499 return false;
500 };
501 let _ = (pk, signature, locked_qc);
505 true
506 }
507 }
508}
509
510impl ConsensusEngine {
511 fn verify_message(&self, msg: &ConsensusMessage) -> bool {
515 let msg_view = match msg {
520 ConsensusMessage::Propose { .. } => None, ConsensusMessage::VoteMsg(v) | ConsensusMessage::Vote2Msg(v) => Some(v.view),
522 ConsensusMessage::Prepare { certificate, .. } => Some(certificate.view),
523 ConsensusMessage::Wish { target_view, .. } => Some(*target_view),
524 ConsensusMessage::TimeoutCert(tc) => Some(ViewNumber(tc.view.as_u64() + 1)),
525 ConsensusMessage::StatusCert { .. } => None,
526 };
527 if let Some(v) = msg_view
528 && v < self.state.current_view
529 {
530 return true; }
532
533 let vs = &self.state.validator_set;
534 match msg {
535 ConsensusMessage::Propose {
536 block,
537 justify,
538 signature,
539 ..
540 } => {
541 let proposer = vs.get(block.proposer);
542 let Some(vi) = proposer else {
543 warn!(proposer = %block.proposer, "propose from unknown validator");
544 return false;
545 };
546 let bytes = view_protocol::proposal_signing_bytes(
547 &self.state.chain_id_hash,
548 block,
549 justify,
550 );
551 if !self.verifier.verify(&vi.public_key, &bytes, signature) {
552 warn!(proposer = %block.proposer, "invalid proposal signature");
553 return false;
554 }
555 if justify.aggregate_signature.count() > 0 {
557 let qc_bytes = Vote::signing_bytes(
558 &self.state.chain_id_hash,
559 justify.view,
560 &justify.block_hash,
561 VoteType::Vote,
562 );
563 if !self
564 .verifier
565 .verify_aggregate(vs, &qc_bytes, &justify.aggregate_signature)
566 {
567 warn!(proposer = %block.proposer, "invalid justify QC aggregate signature");
568 return false;
569 }
570 if !hotmint_crypto::has_quorum(vs, &justify.aggregate_signature) {
571 warn!(proposer = %block.proposer, "justify QC below quorum threshold");
572 return false;
573 }
574 }
575 true
576 }
577 ConsensusMessage::VoteMsg(vote) | ConsensusMessage::Vote2Msg(vote) => {
578 let Some(vi) = vs.get(vote.validator) else {
579 warn!(validator = %vote.validator, "vote from unknown validator");
580 return false;
581 };
582 let bytes = Vote::signing_bytes(
583 &self.state.chain_id_hash,
584 vote.view,
585 &vote.block_hash,
586 vote.vote_type,
587 );
588 if !self
589 .verifier
590 .verify(&vi.public_key, &bytes, &vote.signature)
591 {
592 warn!(validator = %vote.validator, "invalid vote signature");
593 return false;
594 }
595 true
596 }
597 ConsensusMessage::Prepare {
598 certificate,
599 signature,
600 } => {
601 let Some(leader) = vs.leader_for_view(certificate.view) else {
603 return false;
604 };
605 let bytes =
606 view_protocol::prepare_signing_bytes(&self.state.chain_id_hash, certificate);
607 if !self.verifier.verify(&leader.public_key, &bytes, signature) {
608 warn!(view = %certificate.view, "invalid prepare signature");
609 return false;
610 }
611 let qc_bytes = Vote::signing_bytes(
613 &self.state.chain_id_hash,
614 certificate.view,
615 &certificate.block_hash,
616 VoteType::Vote,
617 );
618 if !self
619 .verifier
620 .verify_aggregate(vs, &qc_bytes, &certificate.aggregate_signature)
621 {
622 warn!(view = %certificate.view, "invalid QC aggregate signature");
623 return false;
624 }
625 if !hotmint_crypto::has_quorum(vs, &certificate.aggregate_signature) {
626 warn!(view = %certificate.view, "Prepare QC below quorum threshold");
627 return false;
628 }
629 true
630 }
631 ConsensusMessage::Wish {
632 target_view,
633 validator,
634 highest_qc,
635 signature,
636 } => {
637 let Some(vi) = vs.get(*validator) else {
638 warn!(validator = %validator, "wish from unknown validator");
639 return false;
640 };
641 let bytes = crate::pacemaker::wish_signing_bytes(
643 &self.state.chain_id_hash,
644 *target_view,
645 highest_qc.as_ref(),
646 );
647 if !self.verifier.verify(&vi.public_key, &bytes, signature) {
648 warn!(validator = %validator, "invalid wish signature");
649 return false;
650 }
651 true
652 }
653 ConsensusMessage::TimeoutCert(tc) => {
654 let target_view = ViewNumber(tc.view.as_u64() + 1);
660 let n = vs.validator_count();
661 if tc.aggregate_signature.signers.len() != n {
662 warn!(view = %tc.view, "TC signers bitfield length mismatch");
663 return false;
664 }
665 let mut sig_idx = 0usize;
666 let mut power = 0u64;
667 for (i, &signed) in tc.aggregate_signature.signers.iter().enumerate() {
668 if !signed {
669 continue;
670 }
671 let Some(vi) = vs.validators().get(i) else {
672 warn!(view = %tc.view, validator_idx = i, "TC signer index out of validator set");
673 return false;
674 };
675 let hqc = tc.highest_qcs.get(i).and_then(|h| h.as_ref());
676 let bytes = crate::pacemaker::wish_signing_bytes(
677 &self.state.chain_id_hash,
678 target_view,
679 hqc,
680 );
681 if sig_idx >= tc.aggregate_signature.signatures.len() {
682 warn!(view = %tc.view, "TC aggregate_signature has fewer sigs than signers");
683 return false;
684 }
685 if !self.verifier.verify(
686 &vi.public_key,
687 &bytes,
688 &tc.aggregate_signature.signatures[sig_idx],
689 ) {
690 warn!(view = %tc.view, validator = %vi.id, "TC signer signature invalid");
691 return false;
692 }
693 power += vs.power_of(vi.id);
694 sig_idx += 1;
695 }
696 if sig_idx != tc.aggregate_signature.signatures.len() {
697 warn!(view = %tc.view, "TC has extra signatures beyond bitfield");
698 return false;
699 }
700 if power < vs.quorum_threshold() {
701 warn!(view = %tc.view, power, threshold = vs.quorum_threshold(), "TC insufficient quorum");
702 return false;
703 }
704 true
705 }
706 ConsensusMessage::StatusCert {
707 locked_qc,
708 validator,
709 signature,
710 } => {
711 let Some(vi) = vs.get(*validator) else {
712 warn!(validator = %validator, "status from unknown validator");
713 return false;
714 };
715 let bytes = view_protocol::status_signing_bytes(
716 &self.state.chain_id_hash,
717 self.state.current_view,
718 locked_qc,
719 );
720 if !self.verifier.verify(&vi.public_key, &bytes, signature) {
721 warn!(validator = %validator, "invalid status signature");
722 return false;
723 }
724 true
725 }
726 }
727 }
728
729 async fn handle_message(
730 &mut self,
731 _sender: Option<ValidatorId>,
732 msg: ConsensusMessage,
733 ) -> Result<()> {
734 if !self.verify_message(&msg) {
735 return Ok(());
736 }
737
738 match msg {
739 ConsensusMessage::Propose {
740 block,
741 justify,
742 double_cert,
743 signature: _,
744 } => {
745 let block = *block;
746 let justify = *justify;
747 let double_cert = double_cert.map(|dc| *dc);
748
749 if block.view > self.state.current_view {
751 if let Some(ref dc) = double_cert {
752 if !self.validate_double_cert(dc) {
753 return Ok(());
754 }
755
756 self.apply_commit(dc, "fast-forward").await;
758 self.state.highest_double_cert = Some(dc.clone());
759 self.advance_view_to(block.view, ViewEntryTrigger::DoubleCert(dc.clone()))
760 .await;
761 } else {
762 return Ok(());
763 }
764 } else if block.view < self.state.current_view {
765 if block.height > self.state.last_committed_height {
771 let expected = hotmint_crypto::compute_block_hash(&block);
773 if block.hash == expected {
774 let mut store = self.store.write().await;
775 store.put_block(block);
776 }
777 }
778 return Ok(());
779 }
780
781 let mut store = self.store.write().await;
782
783 if let Some(ref dc) = double_cert
788 && !self.validate_double_cert(dc)
789 {
790 return Ok(());
791 }
792
793 if justify.aggregate_signature.count() > 0
799 && let Some(justified_block) = store.get_block(&justify.block_hash)
800 && store.get_commit_qc(justified_block.height).is_none()
801 {
802 store.put_commit_qc(justified_block.height, justify.clone());
803 }
804
805 let maybe_pending = view_protocol::on_proposal(
806 &mut self.state,
807 view_protocol::ProposalData {
808 block,
809 justify,
810 double_cert,
811 },
812 store.as_mut(),
813 self.network.as_ref(),
814 self.app.as_ref(),
815 self.signer.as_ref(),
816 )
817 .c(d!())?;
818 drop(store);
819
820 if let Some(epoch) = maybe_pending {
821 self.pending_epoch = Some(epoch);
822 }
823 }
824
825 ConsensusMessage::VoteMsg(vote) => {
826 if vote.view != self.state.current_view {
827 return Ok(());
828 }
829 if !self.state.is_leader() {
830 return Ok(());
831 }
832 if vote.vote_type != VoteType::Vote {
833 return Ok(());
834 }
835
836 let result = self
837 .vote_collector
838 .add_vote(&self.state.validator_set, vote)
839 .c(d!())?;
840 self.handle_equivocation(&result);
841 if let Some(qc) = result.qc {
842 self.on_qc_formed(qc).await;
843 }
844 }
845
846 ConsensusMessage::Prepare {
847 certificate,
848 signature: _,
849 } => {
850 if certificate.view < self.state.current_view {
851 return Ok(());
852 }
853 if certificate.view == self.state.current_view {
854 if self.app.tracks_app_hash() {
859 let store = self.store.read().await;
860 if let Some(block) = store.get_block(&certificate.block_hash)
861 && block.app_hash != self.state.last_app_hash
862 {
863 warn!(
864 block_app_hash = %block.app_hash,
865 local_app_hash = %self.state.last_app_hash,
866 "prepare block app_hash mismatch, ignoring"
867 );
868 return Ok(());
869 }
870 }
871 view_protocol::on_prepare(
872 &mut self.state,
873 certificate,
874 self.network.as_ref(),
875 self.signer.as_ref(),
876 );
877 }
878 }
879
880 ConsensusMessage::Vote2Msg(vote) => {
881 if vote.view != self.state.current_view {
882 return Ok(());
883 }
884 let result = self
885 .vote_collector
886 .add_vote(&self.state.validator_set, vote)
887 .c(d!())?;
888 self.handle_equivocation(&result);
889 if let Some(outer_qc) = result.qc {
890 self.on_double_cert_formed(outer_qc).await;
891 }
892 }
893
894 ConsensusMessage::Wish {
895 target_view,
896 validator,
897 highest_qc,
898 signature,
899 } => {
900 if let Some(ref qc) = highest_qc
903 && qc.aggregate_signature.count() > 0
904 {
905 let qc_bytes = Vote::signing_bytes(
906 &self.state.chain_id_hash,
907 qc.view,
908 &qc.block_hash,
909 VoteType::Vote,
910 );
911 if !self.verifier.verify_aggregate(
912 &self.state.validator_set,
913 &qc_bytes,
914 &qc.aggregate_signature,
915 ) {
916 warn!(validator = %validator, "wish carries invalid highest_qc signature");
917 return Ok(());
918 }
919 if !hotmint_crypto::has_quorum(
920 &self.state.validator_set,
921 &qc.aggregate_signature,
922 ) {
923 warn!(validator = %validator, "wish carries highest_qc without quorum");
924 return Ok(());
925 }
926 }
927
928 if let Some(tc) = self.pacemaker.add_wish(
929 &self.state.validator_set,
930 target_view,
931 validator,
932 highest_qc,
933 signature,
934 ) {
935 info!(
936 validator = %self.state.validator_id,
937 view = %tc.view,
938 "TC formed, advancing view"
939 );
940 self.network
941 .broadcast(ConsensusMessage::TimeoutCert(tc.clone()));
942 self.advance_view(ViewEntryTrigger::TimeoutCert(tc)).await;
943 }
944 }
945
946 ConsensusMessage::TimeoutCert(tc) => {
947 if self.pacemaker.should_relay_tc(&tc) {
948 self.network
949 .broadcast(ConsensusMessage::TimeoutCert(tc.clone()));
950 }
951 let new_view = ViewNumber(tc.view.as_u64() + 1);
952 if new_view > self.state.current_view {
953 self.advance_view(ViewEntryTrigger::TimeoutCert(tc)).await;
954 }
955 }
956
957 ConsensusMessage::StatusCert {
958 locked_qc,
959 validator,
960 signature: _,
961 } => {
962 if self.state.is_leader() && self.state.step == ViewStep::WaitingForStatus {
963 if let Some(ref qc) = locked_qc {
964 self.state.update_highest_qc(qc);
965 }
966 self.status_senders.insert(validator);
967 let status_power: u64 = self
968 .status_senders
969 .iter()
970 .map(|v| self.state.validator_set.power_of(*v))
971 .sum();
972 let total_power =
974 status_power + self.state.validator_set.power_of(self.state.validator_id);
975 if total_power >= self.state.validator_set.quorum_threshold() {
976 self.try_propose().await;
977 }
978 }
979 }
980 }
981 Ok(())
982 }
983
984 fn handle_equivocation(&self, result: &crate::vote_collector::VoteResult) {
985 if let Some(ref proof) = result.equivocation {
986 warn!(
987 validator = %proof.validator,
988 view = %proof.view,
989 "equivocation detected!"
990 );
991 if let Err(e) = self.app.on_evidence(proof) {
992 warn!(error = %e, "on_evidence callback failed");
993 }
994 }
995 }
996
997 async fn on_qc_formed(&mut self, qc: QuorumCertificate) {
998 self.current_view_qc = Some(qc.clone());
1000
1001 view_protocol::on_votes_collected(
1002 &mut self.state,
1003 qc.clone(),
1004 self.network.as_ref(),
1005 self.signer.as_ref(),
1006 );
1007
1008 let vote_bytes = Vote::signing_bytes(
1010 &self.state.chain_id_hash,
1011 self.state.current_view,
1012 &qc.block_hash,
1013 VoteType::Vote2,
1014 );
1015 let signature = self.signer.sign(&vote_bytes);
1016 let vote = Vote {
1017 block_hash: qc.block_hash,
1018 view: self.state.current_view,
1019 validator: self.state.validator_id,
1020 signature,
1021 vote_type: VoteType::Vote2,
1022 };
1023
1024 self.state.update_locked_qc(&qc);
1026
1027 let next_leader_id =
1028 leader::next_leader(&self.state.validator_set, self.state.current_view);
1029 if next_leader_id == self.state.validator_id {
1030 match self
1032 .vote_collector
1033 .add_vote(&self.state.validator_set, vote)
1034 {
1035 Ok(result) => {
1036 self.handle_equivocation(&result);
1037 if let Some(outer_qc) = result.qc {
1038 self.on_double_cert_formed(outer_qc).await;
1039 }
1040 }
1041 Err(e) => warn!(error = %e, "failed to add self vote2"),
1042 }
1043 } else {
1044 self.network
1045 .send_to(next_leader_id, ConsensusMessage::Vote2Msg(vote));
1046 }
1047 }
1048
1049 async fn on_double_cert_formed(&mut self, outer_qc: QuorumCertificate) {
1050 let inner_qc = match self.current_view_qc.take() {
1052 Some(qc) if qc.block_hash == outer_qc.block_hash => qc,
1053 _ => {
1054 match &self.state.locked_qc {
1056 Some(qc) if qc.block_hash == outer_qc.block_hash => qc.clone(),
1057 _ => match &self.state.highest_qc {
1058 Some(qc) if qc.block_hash == outer_qc.block_hash => qc.clone(),
1059 _ => {
1060 warn!(
1061 validator = %self.state.validator_id,
1062 "double cert formed but can't find matching inner QC"
1063 );
1064 return;
1065 }
1066 },
1067 }
1068 }
1069 };
1070
1071 let dc = DoubleCertificate { inner_qc, outer_qc };
1072
1073 info!(
1074 validator = %self.state.validator_id,
1075 view = %self.state.current_view,
1076 hash = %dc.inner_qc.block_hash,
1077 "double certificate formed, committing"
1078 );
1079
1080 self.apply_commit(&dc, "double-cert").await;
1082
1083 self.state.highest_double_cert = Some(dc.clone());
1084
1085 self.advance_view(ViewEntryTrigger::DoubleCert(dc)).await;
1087 }
1088
1089 async fn handle_timeout(&mut self) {
1090 let has_power = self.state.validator_set.power_of(self.state.validator_id) > 0;
1094 if !has_power {
1095 self.pacemaker.on_timeout();
1096 return;
1097 }
1098
1099 info!(
1100 validator = %self.state.validator_id,
1101 view = %self.state.current_view,
1102 "view timeout, sending wish"
1103 );
1104
1105 let wish = self.pacemaker.build_wish(
1106 &self.state.chain_id_hash,
1107 self.state.current_view,
1108 self.state.validator_id,
1109 self.state.highest_qc.clone(),
1110 self.signer.as_ref(),
1111 );
1112
1113 self.network.broadcast(wish.clone());
1114
1115 if let ConsensusMessage::Wish {
1117 target_view,
1118 validator,
1119 highest_qc,
1120 signature,
1121 } = wish
1122 && let Some(tc) = self.pacemaker.add_wish(
1123 &self.state.validator_set,
1124 target_view,
1125 validator,
1126 highest_qc,
1127 signature,
1128 )
1129 {
1130 self.network
1131 .broadcast(ConsensusMessage::TimeoutCert(tc.clone()));
1132 self.advance_view(ViewEntryTrigger::TimeoutCert(tc)).await;
1133 return;
1134 }
1135
1136 self.pacemaker.on_timeout();
1138 }
1139
1140 async fn apply_commit(&mut self, dc: &DoubleCertificate, context: &str) {
1143 let store = self.store.read().await;
1144 match try_commit(
1145 dc,
1146 store.as_ref(),
1147 self.app.as_ref(),
1148 &mut self.state.last_committed_height,
1149 &self.state.current_epoch,
1150 ) {
1151 Ok(result) => {
1152 if !result.committed_blocks.is_empty() {
1153 self.state.last_app_hash = result.last_app_hash;
1154 }
1155 if result.pending_epoch.is_some() {
1156 self.pending_epoch = result.pending_epoch;
1157 }
1158 drop(store);
1159 {
1160 let mut s = self.store.write().await;
1161 for block in &result.committed_blocks {
1162 if result.commit_qc.block_hash == block.hash {
1168 s.put_commit_qc(block.height, result.commit_qc.clone());
1169 }
1170 }
1171 s.flush();
1172 }
1173 }
1174 Err(e) => {
1175 warn!(error = %e, "try_commit failed during {context}");
1176 drop(store);
1177 }
1178 }
1179 }
1180
1181 fn validate_double_cert(&self, dc: &DoubleCertificate) -> bool {
1193 if dc.inner_qc.block_hash != dc.outer_qc.block_hash {
1194 warn!("double cert inner/outer block_hash mismatch");
1195 return false;
1196 }
1197 let vs = &self.state.validator_set;
1198 let inner_bytes = Vote::signing_bytes(
1199 &self.state.chain_id_hash,
1200 dc.inner_qc.view,
1201 &dc.inner_qc.block_hash,
1202 VoteType::Vote,
1203 );
1204 if !self
1205 .verifier
1206 .verify_aggregate(vs, &inner_bytes, &dc.inner_qc.aggregate_signature)
1207 {
1208 warn!("double cert inner QC signature invalid");
1209 return false;
1210 }
1211 if !hotmint_crypto::has_quorum(vs, &dc.inner_qc.aggregate_signature) {
1212 warn!("double cert inner QC below quorum threshold");
1213 return false;
1214 }
1215 let outer_bytes = Vote::signing_bytes(
1216 &self.state.chain_id_hash,
1217 dc.outer_qc.view,
1218 &dc.outer_qc.block_hash,
1219 VoteType::Vote2,
1220 );
1221 if !self
1222 .verifier
1223 .verify_aggregate(vs, &outer_bytes, &dc.outer_qc.aggregate_signature)
1224 {
1225 warn!("double cert outer QC signature invalid");
1226 return false;
1227 }
1228 if !hotmint_crypto::has_quorum(vs, &dc.outer_qc.aggregate_signature) {
1229 warn!("double cert outer QC below quorum threshold");
1230 return false;
1231 }
1232 true
1233 }
1234
1235 fn persist_state(&mut self) {
1236 if let Some(p) = self.persistence.as_mut() {
1237 p.save_current_view(self.state.current_view);
1238 if let Some(ref qc) = self.state.locked_qc {
1239 p.save_locked_qc(qc);
1240 }
1241 if let Some(ref qc) = self.state.highest_qc {
1242 p.save_highest_qc(qc);
1243 }
1244 p.save_last_committed_height(self.state.last_committed_height);
1245 p.save_current_epoch(&self.state.current_epoch);
1246 p.save_last_app_hash(self.state.last_app_hash);
1247 p.flush();
1248 }
1249 }
1250
1251 async fn advance_view(&mut self, trigger: ViewEntryTrigger) {
1252 let new_view = match &trigger {
1253 ViewEntryTrigger::DoubleCert(_) => self.state.current_view.next(),
1254 ViewEntryTrigger::TimeoutCert(tc) => ViewNumber(tc.view.as_u64() + 1),
1255 ViewEntryTrigger::Genesis => ViewNumber(1),
1256 };
1257 self.advance_view_to(new_view, trigger).await;
1258 }
1259
1260 async fn advance_view_to(&mut self, new_view: ViewNumber, trigger: ViewEntryTrigger) {
1261 if new_view <= self.state.current_view {
1262 return;
1263 }
1264
1265 let is_progress = matches!(&trigger, ViewEntryTrigger::DoubleCert(_));
1267
1268 self.vote_collector.clear_view(self.state.current_view);
1269 self.vote_collector.prune_before(self.state.current_view);
1270 self.pacemaker.clear_view(self.state.current_view);
1271 self.pacemaker.prune_before(self.state.current_view);
1272 self.status_senders.clear();
1273 self.current_view_qc = None;
1274
1275 if let Some(ref epoch) = self.pending_epoch
1279 && new_view >= epoch.start_view
1280 {
1281 let new_epoch = self.pending_epoch.take().unwrap();
1282 info!(
1283 validator = %self.state.validator_id,
1284 old_epoch = %self.state.current_epoch.number,
1285 new_epoch = %new_epoch.number,
1286 start_view = %new_epoch.start_view,
1287 validators = new_epoch.validator_set.validator_count(),
1288 "epoch transition"
1289 );
1290 self.state.validator_set = new_epoch.validator_set.clone();
1291 self.state.current_epoch = new_epoch;
1292 self.network.on_epoch_change(&self.state.validator_set);
1294 self.vote_collector = VoteCollector::new();
1296 self.pacemaker = Pacemaker::with_config(self.pacemaker_config.clone());
1297 }
1298
1299 view_protocol::enter_view(
1300 &mut self.state,
1301 new_view,
1302 trigger,
1303 self.network.as_ref(),
1304 self.signer.as_ref(),
1305 );
1306
1307 if is_progress {
1308 self.pacemaker.reset_on_progress();
1309 } else {
1310 self.pacemaker.reset_timer();
1311 }
1312
1313 self.persist_state();
1314
1315 if self.state.is_leader() && self.state.step == ViewStep::WaitingForStatus {
1321 self.try_propose().await;
1322 }
1323 }
1324}
1325
1326#[cfg(test)]
1331mod tests {
1332 use super::*;
1333
1334 use std::sync::Arc;
1335
1336 use tokio::sync::RwLock;
1337
1338 use hotmint_crypto::{Ed25519Signer, Ed25519Verifier};
1339 use hotmint_types::Signer as SignerTrait;
1340 use hotmint_types::certificate::QuorumCertificate;
1341 use hotmint_types::crypto::AggregateSignature;
1342 use hotmint_types::validator::{ValidatorId, ValidatorInfo};
1343 use hotmint_types::vote::{Vote, VoteType};
1344 use tokio::sync::mpsc;
1345
1346 use crate::application::NoopApplication;
1347 use crate::network::NetworkSink;
1348 use crate::state::ConsensusState;
1349 use crate::store::MemoryBlockStore;
1350
1351 struct DevNullNetwork;
1353 impl NetworkSink for DevNullNetwork {
1354 fn broadcast(&self, _: ConsensusMessage) {}
1355 fn send_to(&self, _: ValidatorId, _: ConsensusMessage) {}
1356 }
1357
1358 fn test_chain_id_hash() -> [u8; 32] {
1360 *blake3::hash(b"").as_bytes()
1361 }
1362
1363 fn make_validator_set_4() -> (ValidatorSet, Vec<Ed25519Signer>) {
1364 let signers: Vec<Ed25519Signer> = (0..4)
1365 .map(|i| Ed25519Signer::generate(ValidatorId(i)))
1366 .collect();
1367 let infos: Vec<ValidatorInfo> = signers
1368 .iter()
1369 .map(|s| ValidatorInfo {
1370 id: s.validator_id(),
1371 public_key: s.public_key(),
1372 power: 1,
1373 })
1374 .collect();
1375 (ValidatorSet::new(infos), signers)
1376 }
1377
1378 fn make_test_engine(
1379 vid: ValidatorId,
1380 vs: ValidatorSet,
1381 signer: Ed25519Signer,
1382 ) -> (
1383 ConsensusEngine,
1384 mpsc::Sender<(Option<ValidatorId>, ConsensusMessage)>,
1385 ) {
1386 let (tx, rx) = mpsc::channel(64);
1387 let store = Arc::new(RwLock::new(
1388 Box::new(MemoryBlockStore::new()) as Box<dyn crate::store::BlockStore>
1389 ));
1390 let state = ConsensusState::new(vid, vs);
1391 let engine = ConsensusEngine::new(
1392 state,
1393 store,
1394 Box::new(DevNullNetwork),
1395 Box::new(NoopApplication),
1396 Box::new(signer),
1397 rx,
1398 EngineConfig {
1399 verifier: Box::new(Ed25519Verifier),
1400 pacemaker: None,
1401 persistence: None,
1402 },
1403 );
1404 (engine, tx)
1405 }
1406
1407 #[test]
1410 fn r29_propose_sub_quorum_justify_rejected_by_verify_message() {
1411 let (vs, signers) = make_validator_set_4();
1412 let engine_signer = Ed25519Signer::generate(ValidatorId(0));
1415 let (engine, _tx) = make_test_engine(ValidatorId(0), vs.clone(), engine_signer);
1416
1417 let chain_id_hash = test_chain_id_hash();
1419 let hash = BlockHash::GENESIS;
1420 let qc_view = ViewNumber::GENESIS;
1421 let vote_bytes = Vote::signing_bytes(&chain_id_hash, qc_view, &hash, VoteType::Vote);
1422 let mut agg = AggregateSignature::new(4);
1423 agg.add(1, SignerTrait::sign(&signers[1], &vote_bytes))
1424 .unwrap();
1425 let sub_quorum_qc = QuorumCertificate {
1426 block_hash: hash,
1427 view: qc_view,
1428 aggregate_signature: agg,
1429 };
1430
1431 let mut block = Block::genesis();
1433 block.height = Height(1);
1434 block.view = ViewNumber(1);
1435 block.proposer = ValidatorId(1);
1436 block.hash = block.compute_hash();
1437 let proposal_bytes =
1438 crate::view_protocol::proposal_signing_bytes(&chain_id_hash, &block, &sub_quorum_qc);
1439 let signature = SignerTrait::sign(&signers[1], &proposal_bytes);
1440
1441 let msg = ConsensusMessage::Propose {
1442 block: Box::new(block),
1443 justify: Box::new(sub_quorum_qc),
1444 double_cert: None,
1445 signature,
1446 };
1447
1448 assert!(
1449 !engine.verify_message(&msg),
1450 "R-29 regression: Propose with sub-quorum justify QC must be rejected by verify_message"
1451 );
1452 }
1453
1454 #[test]
1456 fn r29_propose_full_quorum_justify_accepted_by_verify_message() {
1457 let (vs, signers) = make_validator_set_4();
1458 let engine_signer = Ed25519Signer::generate(ValidatorId(0));
1459 let (engine, _tx) = make_test_engine(ValidatorId(0), vs.clone(), engine_signer);
1460
1461 let chain_id_hash = test_chain_id_hash();
1462 let hash = BlockHash::GENESIS;
1463 let qc_view = ViewNumber::GENESIS;
1464 let vote_bytes = Vote::signing_bytes(&chain_id_hash, qc_view, &hash, VoteType::Vote);
1465 let mut agg = AggregateSignature::new(4);
1467 for (i, signer) in signers.iter().take(3).enumerate() {
1468 agg.add(i, SignerTrait::sign(signer, &vote_bytes)).unwrap();
1469 }
1470 let full_quorum_qc = QuorumCertificate {
1471 block_hash: hash,
1472 view: qc_view,
1473 aggregate_signature: agg,
1474 };
1475
1476 let mut block = Block::genesis();
1477 block.height = Height(1);
1478 block.view = ViewNumber(1);
1479 block.proposer = ValidatorId(1);
1480 block.hash = block.compute_hash();
1481 let proposal_bytes =
1482 crate::view_protocol::proposal_signing_bytes(&chain_id_hash, &block, &full_quorum_qc);
1483 let signature = SignerTrait::sign(&signers[1], &proposal_bytes);
1484
1485 let msg = ConsensusMessage::Propose {
1486 block: Box::new(block),
1487 justify: Box::new(full_quorum_qc),
1488 double_cert: None,
1489 signature,
1490 };
1491
1492 assert!(
1493 engine.verify_message(&msg),
1494 "R-29: Propose with full quorum justify QC must pass verify_message"
1495 );
1496 }
1497
1498 #[test]
1506 fn r32_sub_quorum_highest_qc_fails_has_quorum() {
1507 let (vs, signers) = make_validator_set_4();
1508
1509 let chain_id_hash = test_chain_id_hash();
1510 let hash = BlockHash([1u8; 32]);
1511 let qc_view = ViewNumber(1);
1512 let vote_bytes = Vote::signing_bytes(&chain_id_hash, qc_view, &hash, VoteType::Vote);
1513
1514 let mut agg = AggregateSignature::new(4);
1516 agg.add(0, SignerTrait::sign(&signers[0], &vote_bytes))
1517 .unwrap();
1518
1519 assert!(
1520 !hotmint_crypto::has_quorum(&vs, &agg),
1521 "R-32 regression: 1-of-4 signed QC must not satisfy has_quorum"
1522 );
1523
1524 let mut agg_full = AggregateSignature::new(4);
1526 for (i, signer) in signers.iter().take(3).enumerate() {
1527 agg_full
1528 .add(i, SignerTrait::sign(signer, &vote_bytes))
1529 .unwrap();
1530 }
1531 assert!(
1532 hotmint_crypto::has_quorum(&vs, &agg_full),
1533 "R-32: 3-of-4 signed QC must satisfy has_quorum"
1534 );
1535 }
1536}