1use ruc::*;
2
3use std::collections::HashSet;
4use std::sync::{Arc, RwLock};
5
6use crate::application::Application;
7use crate::commit::try_commit;
8use crate::leader;
9use crate::network::NetworkSink;
10use crate::pacemaker::{Pacemaker, PacemakerConfig};
11use crate::state::{ConsensusState, ViewStep};
12use crate::store::BlockStore;
13use crate::view_protocol::{self, ViewEntryTrigger};
14use crate::vote_collector::VoteCollector;
15
16use hotmint_types::epoch::Epoch;
17use hotmint_types::vote::VoteType;
18use hotmint_types::*;
19use tokio::sync::mpsc;
20use tracing::{info, warn};
21
22pub type SharedBlockStore = Arc<RwLock<Box<dyn BlockStore>>>;
24
25pub trait StatePersistence: Send {
27 fn save_current_view(&mut self, view: ViewNumber);
28 fn save_locked_qc(&mut self, qc: &QuorumCertificate);
29 fn save_highest_qc(&mut self, qc: &QuorumCertificate);
30 fn save_last_committed_height(&mut self, height: Height);
31 fn save_current_epoch(&mut self, epoch: &Epoch);
32 fn flush(&self);
33}
34
35pub struct ConsensusEngine {
36 state: ConsensusState,
37 store: SharedBlockStore,
38 network: Box<dyn NetworkSink>,
39 app: Box<dyn Application>,
40 signer: Box<dyn Signer>,
41 verifier: Box<dyn Verifier>,
42 vote_collector: VoteCollector,
43 pacemaker: Pacemaker,
44 pacemaker_config: PacemakerConfig,
45 msg_rx: mpsc::Receiver<(ValidatorId, ConsensusMessage)>,
46 status_senders: HashSet<ValidatorId>,
48 current_view_qc: Option<QuorumCertificate>,
50 pending_epoch: Option<Epoch>,
52 persistence: Option<Box<dyn StatePersistence>>,
54}
55
56pub struct EngineConfig {
58 pub verifier: Box<dyn Verifier>,
59 pub pacemaker: Option<PacemakerConfig>,
60 pub persistence: Option<Box<dyn StatePersistence>>,
61}
62
63impl EngineConfig {
64 pub fn new(verifier: Box<dyn Verifier>) -> Self {
67 Self {
68 verifier,
69 pacemaker: None,
70 persistence: None,
71 }
72 }
73
74 pub fn with_pacemaker(mut self, pacemaker: PacemakerConfig) -> Self {
76 self.pacemaker = Some(pacemaker);
77 self
78 }
79
80 pub fn with_persistence(mut self, persistence: Box<dyn StatePersistence>) -> Self {
82 self.persistence = Some(persistence);
83 self
84 }
85}
86
87pub struct ConsensusEngineBuilder {
103 state: Option<ConsensusState>,
104 store: Option<SharedBlockStore>,
105 network: Option<Box<dyn NetworkSink>>,
106 app: Option<Box<dyn Application>>,
107 signer: Option<Box<dyn Signer>>,
108 msg_rx: Option<mpsc::Receiver<(ValidatorId, ConsensusMessage)>>,
109 verifier: Option<Box<dyn Verifier>>,
110 pacemaker: Option<PacemakerConfig>,
111 persistence: Option<Box<dyn StatePersistence>>,
112}
113
114impl ConsensusEngineBuilder {
115 pub fn new() -> Self {
116 Self {
117 state: None,
118 store: None,
119 network: None,
120 app: None,
121 signer: None,
122 msg_rx: None,
123 verifier: None,
124 pacemaker: None,
125 persistence: None,
126 }
127 }
128
129 pub fn state(mut self, state: ConsensusState) -> Self {
130 self.state = Some(state);
131 self
132 }
133
134 pub fn store(mut self, store: SharedBlockStore) -> Self {
135 self.store = Some(store);
136 self
137 }
138
139 pub fn network(mut self, network: Box<dyn NetworkSink>) -> Self {
140 self.network = Some(network);
141 self
142 }
143
144 pub fn app(mut self, app: Box<dyn Application>) -> Self {
145 self.app = Some(app);
146 self
147 }
148
149 pub fn signer(mut self, signer: Box<dyn Signer>) -> Self {
150 self.signer = Some(signer);
151 self
152 }
153
154 pub fn messages(mut self, msg_rx: mpsc::Receiver<(ValidatorId, ConsensusMessage)>) -> Self {
155 self.msg_rx = Some(msg_rx);
156 self
157 }
158
159 pub fn verifier(mut self, verifier: Box<dyn Verifier>) -> Self {
160 self.verifier = Some(verifier);
161 self
162 }
163
164 pub fn pacemaker(mut self, config: PacemakerConfig) -> Self {
165 self.pacemaker = Some(config);
166 self
167 }
168
169 pub fn persistence(mut self, persistence: Box<dyn StatePersistence>) -> Self {
170 self.persistence = Some(persistence);
171 self
172 }
173
174 pub fn build(self) -> ruc::Result<ConsensusEngine> {
175 let state = self.state.ok_or_else(|| ruc::eg!("state is required"))?;
176 let store = self.store.ok_or_else(|| ruc::eg!("store is required"))?;
177 let network = self
178 .network
179 .ok_or_else(|| ruc::eg!("network is required"))?;
180 let app = self.app.ok_or_else(|| ruc::eg!("app is required"))?;
181 let signer = self.signer.ok_or_else(|| ruc::eg!("signer is required"))?;
182 let msg_rx = self
183 .msg_rx
184 .ok_or_else(|| ruc::eg!("messages (msg_rx) is required"))?;
185 let verifier = self
186 .verifier
187 .ok_or_else(|| ruc::eg!("verifier is required"))?;
188
189 let config = EngineConfig {
190 verifier,
191 pacemaker: self.pacemaker,
192 persistence: self.persistence,
193 };
194
195 Ok(ConsensusEngine::new(
196 state, store, network, app, signer, msg_rx, config,
197 ))
198 }
199}
200
201impl Default for ConsensusEngineBuilder {
202 fn default() -> Self {
203 Self::new()
204 }
205}
206
207impl ConsensusEngine {
208 pub fn new(
209 state: ConsensusState,
210 store: SharedBlockStore,
211 network: Box<dyn NetworkSink>,
212 app: Box<dyn Application>,
213 signer: Box<dyn Signer>,
214 msg_rx: mpsc::Receiver<(ValidatorId, ConsensusMessage)>,
215 config: EngineConfig,
216 ) -> Self {
217 let pc = config.pacemaker.unwrap_or_default();
218 Self {
219 state,
220 store,
221 network,
222 app,
223 signer,
224 verifier: config.verifier,
225 vote_collector: VoteCollector::new(),
226 pacemaker: Pacemaker::with_config(pc.clone()),
227 pacemaker_config: pc,
228 msg_rx,
229 status_senders: HashSet::new(),
230 current_view_qc: None,
231 pending_epoch: None,
232 persistence: config.persistence,
233 }
234 }
235
236 pub async fn run(mut self) {
239 if self.state.current_view.as_u64() <= 1 {
240 self.enter_genesis_view();
241 } else {
242 info!(
243 validator = %self.state.validator_id,
244 view = %self.state.current_view,
245 height = %self.state.last_committed_height,
246 "resuming from persisted state"
247 );
248 self.pacemaker.reset_timer();
249 }
250
251 loop {
252 let deadline = self.pacemaker.sleep_until_deadline();
253 tokio::pin!(deadline);
254
255 tokio::select! {
256 Some((sender, msg)) = self.msg_rx.recv() => {
257 if let Err(e) = self.handle_message(sender, msg) {
258 warn!(validator = %self.state.validator_id, error = %e, "error handling message");
259 }
260 }
261 _ = &mut deadline => {
262 self.handle_timeout();
263 }
264 }
265 }
266 }
267
268 fn enter_genesis_view(&mut self) {
269 let genesis_qc = QuorumCertificate {
271 block_hash: BlockHash::GENESIS,
272 view: ViewNumber::GENESIS,
273 aggregate_signature: AggregateSignature::new(
274 self.state.validator_set.validator_count(),
275 ),
276 };
277 self.state.highest_qc = Some(genesis_qc);
278
279 let view = ViewNumber(1);
280 view_protocol::enter_view(
281 &mut self.state,
282 view,
283 ViewEntryTrigger::Genesis,
284 self.network.as_ref(),
285 self.signer.as_ref(),
286 );
287 self.pacemaker.reset_timer();
288
289 if self.state.is_leader() {
291 self.state.step = ViewStep::WaitingForStatus;
292 self.try_propose();
294 }
295 }
296
297 fn try_propose(&mut self) {
298 let mut store = self.store.write().unwrap();
299 match view_protocol::propose(
300 &mut self.state,
301 store.as_mut(),
302 self.network.as_ref(),
303 self.app.as_ref(),
304 self.signer.as_ref(),
305 ) {
306 Ok(block) => {
307 drop(store);
308 self.leader_self_vote(block.hash);
310 }
311 Err(e) => {
312 warn!(
313 validator = %self.state.validator_id,
314 error = %e,
315 "failed to propose"
316 );
317 }
318 }
319 }
320
321 fn leader_self_vote(&mut self, block_hash: BlockHash) {
322 let vote_bytes = Vote::signing_bytes(self.state.current_view, &block_hash, VoteType::Vote);
323 let signature = self.signer.sign(&vote_bytes);
324 let vote = Vote {
325 block_hash,
326 view: self.state.current_view,
327 validator: self.state.validator_id,
328 signature,
329 vote_type: VoteType::Vote,
330 };
331 match self
332 .vote_collector
333 .add_vote(&self.state.validator_set, vote)
334 {
335 Ok(result) => {
336 self.handle_equivocation(&result);
337 if let Some(qc) = result.qc {
338 self.on_qc_formed(qc);
339 }
340 }
341 Err(e) => warn!(error = %e, "failed to add self vote"),
342 }
343 }
344
345 fn verify_message(&self, msg: &ConsensusMessage) -> bool {
349 let msg_view = match msg {
354 ConsensusMessage::Propose { .. } => None, ConsensusMessage::VoteMsg(v) | ConsensusMessage::Vote2Msg(v) => Some(v.view),
356 ConsensusMessage::Prepare { certificate, .. } => Some(certificate.view),
357 ConsensusMessage::Wish { target_view, .. } => Some(*target_view),
358 ConsensusMessage::TimeoutCert(tc) => Some(ViewNumber(tc.view.as_u64() + 1)),
359 ConsensusMessage::StatusCert { .. } => None,
360 };
361 if let Some(v) = msg_view
362 && v < self.state.current_view
363 {
364 return true; }
366
367 let vs = &self.state.validator_set;
368 match msg {
369 ConsensusMessage::Propose {
370 block,
371 justify,
372 signature,
373 ..
374 } => {
375 let proposer = vs.get(block.proposer);
376 let Some(vi) = proposer else {
377 warn!(proposer = %block.proposer, "propose from unknown validator");
378 return false;
379 };
380 let bytes = view_protocol::proposal_signing_bytes(block, justify);
381 if !self.verifier.verify(&vi.public_key, &bytes, signature) {
382 warn!(proposer = %block.proposer, "invalid proposal signature");
383 return false;
384 }
385 if justify.aggregate_signature.count() > 0 {
387 let qc_bytes =
388 Vote::signing_bytes(justify.view, &justify.block_hash, VoteType::Vote);
389 if !self
390 .verifier
391 .verify_aggregate(vs, &qc_bytes, &justify.aggregate_signature)
392 {
393 warn!(proposer = %block.proposer, "invalid justify QC aggregate signature");
394 return false;
395 }
396 }
397 true
398 }
399 ConsensusMessage::VoteMsg(vote) | ConsensusMessage::Vote2Msg(vote) => {
400 let Some(vi) = vs.get(vote.validator) else {
401 warn!(validator = %vote.validator, "vote from unknown validator");
402 return false;
403 };
404 let bytes = Vote::signing_bytes(vote.view, &vote.block_hash, vote.vote_type);
405 if !self
406 .verifier
407 .verify(&vi.public_key, &bytes, &vote.signature)
408 {
409 warn!(validator = %vote.validator, "invalid vote signature");
410 return false;
411 }
412 true
413 }
414 ConsensusMessage::Prepare {
415 certificate,
416 signature,
417 } => {
418 let leader = vs.leader_for_view(certificate.view);
420 let bytes = view_protocol::prepare_signing_bytes(certificate);
421 if !self.verifier.verify(&leader.public_key, &bytes, signature) {
422 warn!(view = %certificate.view, "invalid prepare signature");
423 return false;
424 }
425 let qc_bytes =
427 Vote::signing_bytes(certificate.view, &certificate.block_hash, VoteType::Vote);
428 if !self
429 .verifier
430 .verify_aggregate(vs, &qc_bytes, &certificate.aggregate_signature)
431 {
432 warn!(view = %certificate.view, "invalid QC aggregate signature");
433 return false;
434 }
435 true
436 }
437 ConsensusMessage::Wish {
438 target_view,
439 validator,
440 signature,
441 ..
442 } => {
443 let Some(vi) = vs.get(*validator) else {
444 warn!(validator = %validator, "wish from unknown validator");
445 return false;
446 };
447 let bytes = crate::pacemaker::wish_signing_bytes(*target_view);
448 if !self.verifier.verify(&vi.public_key, &bytes, signature) {
449 warn!(validator = %validator, "invalid wish signature");
450 return false;
451 }
452 true
453 }
454 ConsensusMessage::TimeoutCert(tc) => {
455 let bytes = crate::pacemaker::wish_signing_bytes(ViewNumber(tc.view.as_u64() + 1));
460 if !self
461 .verifier
462 .verify_aggregate(vs, &bytes, &tc.aggregate_signature)
463 {
464 warn!(view = %tc.view, "invalid TC aggregate signature");
465 return false;
466 }
467 true
468 }
469 ConsensusMessage::StatusCert {
470 locked_qc,
471 validator,
472 signature,
473 } => {
474 let Some(vi) = vs.get(*validator) else {
475 warn!(validator = %validator, "status from unknown validator");
476 return false;
477 };
478 let bytes = view_protocol::status_signing_bytes(self.state.current_view, locked_qc);
479 if !self.verifier.verify(&vi.public_key, &bytes, signature) {
480 warn!(validator = %validator, "invalid status signature");
481 return false;
482 }
483 true
484 }
485 }
486 }
487
488 fn handle_message(&mut self, _sender: ValidatorId, msg: ConsensusMessage) -> Result<()> {
489 if !self.verify_message(&msg) {
490 return Ok(());
491 }
492
493 match msg {
494 ConsensusMessage::Propose {
495 block,
496 justify,
497 double_cert,
498 signature: _,
499 } => {
500 let block = *block;
501 let justify = *justify;
502 let double_cert = double_cert.map(|dc| *dc);
503
504 if block.view > self.state.current_view {
506 if let Some(ref dc) = double_cert {
507 if dc.inner_qc.block_hash != dc.outer_qc.block_hash {
510 warn!("double cert inner/outer block_hash mismatch");
511 return Ok(());
512 }
513 let quorum = self.state.validator_set.quorum_threshold() as usize;
515 if dc.inner_qc.aggregate_signature.count() < quorum {
516 warn!("double cert inner QC insufficient signers");
517 return Ok(());
518 }
519 if dc.outer_qc.aggregate_signature.count() < quorum {
520 warn!("double cert outer QC insufficient signers");
521 return Ok(());
522 }
523 let inner_bytes = Vote::signing_bytes(
525 dc.inner_qc.view,
526 &dc.inner_qc.block_hash,
527 VoteType::Vote,
528 );
529 if !self.verifier.verify_aggregate(
530 &self.state.validator_set,
531 &inner_bytes,
532 &dc.inner_qc.aggregate_signature,
533 ) {
534 warn!("double cert inner QC signature invalid");
535 return Ok(());
536 }
537 let outer_bytes = Vote::signing_bytes(
539 dc.outer_qc.view,
540 &dc.outer_qc.block_hash,
541 VoteType::Vote2,
542 );
543 if !self.verifier.verify_aggregate(
544 &self.state.validator_set,
545 &outer_bytes,
546 &dc.outer_qc.aggregate_signature,
547 ) {
548 warn!("double cert outer QC signature invalid");
549 return Ok(());
550 }
551
552 let store = self.store.read().unwrap();
554 match try_commit(
555 dc,
556 store.as_ref(),
557 self.app.as_ref(),
558 &mut self.state.last_committed_height,
559 &self.state.current_epoch,
560 ) {
561 Ok(result) => {
562 if result.pending_epoch.is_some() {
563 self.pending_epoch = result.pending_epoch;
564 }
565 drop(store);
566 {
568 let mut s = self.store.write().unwrap();
569 for block in &result.committed_blocks {
570 s.put_commit_qc(block.height, result.commit_qc.clone());
571 }
572 s.flush();
573 }
574 }
575 Err(e) => {
576 warn!(error = %e, "try_commit failed during fast-forward");
577 drop(store);
578 }
579 }
580 self.state.highest_double_cert = Some(dc.clone());
581 self.advance_view_to(block.view, ViewEntryTrigger::DoubleCert(dc.clone()));
582 } else {
583 return Ok(());
584 }
585 } else if block.view < self.state.current_view {
586 if block.height > self.state.last_committed_height {
592 let expected = hotmint_crypto::compute_block_hash(&block);
594 if block.hash == expected {
595 let mut store = self.store.write().unwrap();
596 store.put_block(block);
597 }
598 }
599 return Ok(());
600 }
601
602 let mut store = self.store.write().unwrap();
603 let maybe_pending = view_protocol::on_proposal(
604 &mut self.state,
605 view_protocol::ProposalData {
606 block,
607 justify,
608 double_cert,
609 },
610 store.as_mut(),
611 self.network.as_ref(),
612 self.app.as_ref(),
613 self.signer.as_ref(),
614 )
615 .c(d!())?;
616 drop(store);
617
618 if let Some(epoch) = maybe_pending {
619 self.pending_epoch = Some(epoch);
620 }
621 }
622
623 ConsensusMessage::VoteMsg(vote) => {
624 if vote.view != self.state.current_view {
625 return Ok(());
626 }
627 if !self.state.is_leader() {
628 return Ok(());
629 }
630 if vote.vote_type != VoteType::Vote {
631 return Ok(());
632 }
633
634 let result = self
635 .vote_collector
636 .add_vote(&self.state.validator_set, vote)
637 .c(d!())?;
638 self.handle_equivocation(&result);
639 if let Some(qc) = result.qc {
640 self.on_qc_formed(qc);
641 }
642 }
643
644 ConsensusMessage::Prepare {
645 certificate,
646 signature: _,
647 } => {
648 if certificate.view < self.state.current_view {
649 return Ok(());
650 }
651 if certificate.view == self.state.current_view {
652 view_protocol::on_prepare(
653 &mut self.state,
654 certificate,
655 self.network.as_ref(),
656 self.signer.as_ref(),
657 );
658 }
659 }
660
661 ConsensusMessage::Vote2Msg(vote) => {
662 if vote.vote_type != VoteType::Vote2 {
663 return Ok(());
664 }
665 if vote.view != self.state.current_view {
666 return Ok(());
667 }
668 let result = self
669 .vote_collector
670 .add_vote(&self.state.validator_set, vote)
671 .c(d!())?;
672 self.handle_equivocation(&result);
673 if let Some(outer_qc) = result.qc {
674 self.on_double_cert_formed(outer_qc);
675 }
676 }
677
678 ConsensusMessage::Wish {
679 target_view,
680 validator,
681 highest_qc,
682 signature,
683 } => {
684 if let Some(ref qc) = highest_qc
686 && qc.aggregate_signature.count() > 0
687 {
688 let qc_bytes = Vote::signing_bytes(qc.view, &qc.block_hash, VoteType::Vote);
689 if !self.verifier.verify_aggregate(
690 &self.state.validator_set,
691 &qc_bytes,
692 &qc.aggregate_signature,
693 ) {
694 warn!(validator = %validator, "wish carries invalid highest_qc");
695 return Ok(());
696 }
697 }
698
699 if let Some(tc) = self.pacemaker.add_wish(
700 &self.state.validator_set,
701 target_view,
702 validator,
703 highest_qc,
704 signature,
705 ) {
706 info!(
707 validator = %self.state.validator_id,
708 view = %tc.view,
709 "TC formed, advancing view"
710 );
711 self.network
712 .broadcast(ConsensusMessage::TimeoutCert(tc.clone()));
713 self.advance_view(ViewEntryTrigger::TimeoutCert(tc));
714 }
715 }
716
717 ConsensusMessage::TimeoutCert(tc) => {
718 if self.pacemaker.should_relay_tc(&tc) {
719 self.network
720 .broadcast(ConsensusMessage::TimeoutCert(tc.clone()));
721 }
722 let new_view = ViewNumber(tc.view.as_u64() + 1);
723 if new_view > self.state.current_view {
724 self.advance_view(ViewEntryTrigger::TimeoutCert(tc));
725 }
726 }
727
728 ConsensusMessage::StatusCert {
729 locked_qc,
730 validator,
731 signature: _,
732 } => {
733 if self.state.is_leader() && self.state.step == ViewStep::WaitingForStatus {
734 if let Some(ref qc) = locked_qc {
735 self.state.update_highest_qc(qc);
736 }
737 self.status_senders.insert(validator);
738 let status_power: u64 = self
739 .status_senders
740 .iter()
741 .map(|v| self.state.validator_set.power_of(*v))
742 .sum();
743 let total_power =
745 status_power + self.state.validator_set.power_of(self.state.validator_id);
746 if total_power >= self.state.validator_set.quorum_threshold() {
747 self.try_propose();
748 }
749 }
750 }
751 }
752 Ok(())
753 }
754
755 fn handle_equivocation(&self, result: &crate::vote_collector::VoteResult) {
756 if let Some(ref proof) = result.equivocation {
757 warn!(
758 validator = %proof.validator,
759 view = %proof.view,
760 "equivocation detected!"
761 );
762 if let Err(e) = self.app.on_evidence(proof) {
763 warn!(error = %e, "on_evidence callback failed");
764 }
765 }
766 }
767
768 fn on_qc_formed(&mut self, qc: QuorumCertificate) {
769 self.current_view_qc = Some(qc.clone());
771
772 view_protocol::on_votes_collected(
773 &mut self.state,
774 qc.clone(),
775 self.network.as_ref(),
776 self.signer.as_ref(),
777 );
778
779 let vote_bytes =
781 Vote::signing_bytes(self.state.current_view, &qc.block_hash, VoteType::Vote2);
782 let signature = self.signer.sign(&vote_bytes);
783 let vote = Vote {
784 block_hash: qc.block_hash,
785 view: self.state.current_view,
786 validator: self.state.validator_id,
787 signature,
788 vote_type: VoteType::Vote2,
789 };
790
791 self.state.update_locked_qc(&qc);
793
794 let next_leader_id =
795 leader::next_leader(&self.state.validator_set, self.state.current_view);
796 if next_leader_id == self.state.validator_id {
797 match self
799 .vote_collector
800 .add_vote(&self.state.validator_set, vote)
801 {
802 Ok(result) => {
803 self.handle_equivocation(&result);
804 if let Some(outer_qc) = result.qc {
805 self.on_double_cert_formed(outer_qc);
806 }
807 }
808 Err(e) => warn!(error = %e, "failed to add self vote2"),
809 }
810 } else {
811 self.network
812 .send_to(next_leader_id, ConsensusMessage::Vote2Msg(vote));
813 }
814 }
815
816 fn on_double_cert_formed(&mut self, outer_qc: QuorumCertificate) {
817 let inner_qc = match self.current_view_qc.take() {
819 Some(qc) if qc.block_hash == outer_qc.block_hash => qc,
820 _ => {
821 match &self.state.locked_qc {
823 Some(qc) if qc.block_hash == outer_qc.block_hash => qc.clone(),
824 _ => match &self.state.highest_qc {
825 Some(qc) if qc.block_hash == outer_qc.block_hash => qc.clone(),
826 _ => {
827 warn!(
828 validator = %self.state.validator_id,
829 "double cert formed but can't find matching inner QC"
830 );
831 return;
832 }
833 },
834 }
835 }
836 };
837
838 let dc = DoubleCertificate { inner_qc, outer_qc };
839
840 info!(
841 validator = %self.state.validator_id,
842 view = %self.state.current_view,
843 hash = %dc.inner_qc.block_hash,
844 "double certificate formed, committing"
845 );
846
847 {
849 let store = self.store.read().unwrap();
850 match try_commit(
851 &dc,
852 store.as_ref(),
853 self.app.as_ref(),
854 &mut self.state.last_committed_height,
855 &self.state.current_epoch,
856 ) {
857 Ok(result) => {
858 if result.pending_epoch.is_some() {
859 self.pending_epoch = result.pending_epoch;
860 }
861 drop(store);
862 {
863 let mut s = self.store.write().unwrap();
864 for block in &result.committed_blocks {
865 s.put_commit_qc(block.height, result.commit_qc.clone());
866 }
867 s.flush();
868 }
869 }
870 Err(e) => {
871 warn!(error = %e, "try_commit failed in double cert handler");
872 drop(store);
873 }
874 }
875 }
876
877 self.state.highest_double_cert = Some(dc.clone());
878
879 self.advance_view(ViewEntryTrigger::DoubleCert(dc));
881 }
882
883 fn handle_timeout(&mut self) {
884 info!(
885 validator = %self.state.validator_id,
886 view = %self.state.current_view,
887 "view timeout, sending wish"
888 );
889
890 let wish = self.pacemaker.build_wish(
891 self.state.current_view,
892 self.state.validator_id,
893 self.state.highest_qc.clone(),
894 self.signer.as_ref(),
895 );
896
897 self.network.broadcast(wish.clone());
898
899 if let ConsensusMessage::Wish {
901 target_view,
902 validator,
903 highest_qc,
904 signature,
905 } = wish
906 && let Some(tc) = self.pacemaker.add_wish(
907 &self.state.validator_set,
908 target_view,
909 validator,
910 highest_qc,
911 signature,
912 )
913 {
914 self.network
915 .broadcast(ConsensusMessage::TimeoutCert(tc.clone()));
916 self.advance_view(ViewEntryTrigger::TimeoutCert(tc));
917 return;
918 }
919
920 self.pacemaker.on_timeout();
922 }
923
924 fn persist_state(&mut self) {
925 if let Some(p) = self.persistence.as_mut() {
926 p.save_current_view(self.state.current_view);
927 if let Some(ref qc) = self.state.locked_qc {
928 p.save_locked_qc(qc);
929 }
930 if let Some(ref qc) = self.state.highest_qc {
931 p.save_highest_qc(qc);
932 }
933 p.save_last_committed_height(self.state.last_committed_height);
934 p.save_current_epoch(&self.state.current_epoch);
935 p.flush();
936 }
937 }
938
939 fn advance_view(&mut self, trigger: ViewEntryTrigger) {
940 let new_view = match &trigger {
941 ViewEntryTrigger::DoubleCert(_) => self.state.current_view.next(),
942 ViewEntryTrigger::TimeoutCert(tc) => ViewNumber(tc.view.as_u64() + 1),
943 ViewEntryTrigger::Genesis => ViewNumber(1),
944 };
945 self.advance_view_to(new_view, trigger);
946 }
947
948 fn advance_view_to(&mut self, new_view: ViewNumber, trigger: ViewEntryTrigger) {
949 if new_view <= self.state.current_view {
950 return;
951 }
952
953 let is_progress = matches!(&trigger, ViewEntryTrigger::DoubleCert(_));
955
956 self.vote_collector.clear_view(self.state.current_view);
957 self.vote_collector.prune_before(self.state.current_view);
958 self.pacemaker.clear_view(self.state.current_view);
959 self.status_senders.clear();
960 self.current_view_qc = None;
961
962 if let Some(ref epoch) = self.pending_epoch
966 && new_view >= epoch.start_view
967 {
968 let new_epoch = self.pending_epoch.take().unwrap();
969 info!(
970 validator = %self.state.validator_id,
971 old_epoch = %self.state.current_epoch.number,
972 new_epoch = %new_epoch.number,
973 start_view = %new_epoch.start_view,
974 validators = new_epoch.validator_set.validator_count(),
975 "epoch transition"
976 );
977 self.state.validator_set = new_epoch.validator_set.clone();
978 self.state.current_epoch = new_epoch;
979 self.network.on_epoch_change(&self.state.validator_set);
981 self.vote_collector = VoteCollector::new();
983 self.pacemaker = Pacemaker::with_config(self.pacemaker_config.clone());
984 }
985
986 view_protocol::enter_view(
987 &mut self.state,
988 new_view,
989 trigger,
990 self.network.as_ref(),
991 self.signer.as_ref(),
992 );
993
994 if is_progress {
995 self.pacemaker.reset_on_progress();
996 } else {
997 self.pacemaker.reset_timer();
998 }
999
1000 self.persist_state();
1001
1002 if self.state.is_leader() && self.state.step == ViewStep::WaitingForStatus {
1008 self.try_propose();
1009 }
1010 }
1011}