1use ruc::*;
2
3use std::collections::HashSet;
4use std::sync::{Arc, RwLock};
5
6use crate::application::Application;
7use crate::commit::try_commit;
8use crate::leader;
9use crate::network::NetworkSink;
10use crate::pacemaker::{Pacemaker, PacemakerConfig};
11use crate::state::{ConsensusState, ViewStep};
12use crate::store::BlockStore;
13use crate::view_protocol::{self, ViewEntryTrigger};
14use crate::vote_collector::VoteCollector;
15
16use hotmint_types::epoch::Epoch;
17use hotmint_types::vote::VoteType;
18use hotmint_types::*;
19use tokio::sync::mpsc;
20use tracing::{info, warn};
21
22pub type SharedBlockStore = Arc<RwLock<Box<dyn BlockStore>>>;
24
25pub trait StatePersistence: Send {
27 fn save_current_view(&mut self, view: ViewNumber);
28 fn save_locked_qc(&mut self, qc: &QuorumCertificate);
29 fn save_highest_qc(&mut self, qc: &QuorumCertificate);
30 fn save_last_committed_height(&mut self, height: Height);
31 fn save_current_epoch(&mut self, epoch: &Epoch);
32 fn flush(&self);
33}
34
35pub struct ConsensusEngine {
36 state: ConsensusState,
37 store: SharedBlockStore,
38 network: Box<dyn NetworkSink>,
39 app: Box<dyn Application>,
40 signer: Box<dyn Signer>,
41 verifier: Box<dyn Verifier>,
42 vote_collector: VoteCollector,
43 pacemaker: Pacemaker,
44 pacemaker_config: PacemakerConfig,
45 msg_rx: mpsc::Receiver<(ValidatorId, ConsensusMessage)>,
46 status_senders: HashSet<ValidatorId>,
48 current_view_qc: Option<QuorumCertificate>,
50 pending_epoch: Option<Epoch>,
52 persistence: Option<Box<dyn StatePersistence>>,
54}
55
56pub struct EngineConfig {
58 pub verifier: Box<dyn Verifier>,
59 pub pacemaker: Option<PacemakerConfig>,
60 pub persistence: Option<Box<dyn StatePersistence>>,
61}
62
63impl ConsensusEngine {
64 pub fn new(
65 state: ConsensusState,
66 store: SharedBlockStore,
67 network: Box<dyn NetworkSink>,
68 app: Box<dyn Application>,
69 signer: Box<dyn Signer>,
70 msg_rx: mpsc::Receiver<(ValidatorId, ConsensusMessage)>,
71 config: EngineConfig,
72 ) -> Self {
73 let pc = config.pacemaker.unwrap_or_default();
74 Self {
75 state,
76 store,
77 network,
78 app,
79 signer,
80 verifier: config.verifier,
81 vote_collector: VoteCollector::new(),
82 pacemaker: Pacemaker::with_config(pc.clone()),
83 pacemaker_config: pc,
84 msg_rx,
85 status_senders: HashSet::new(),
86 current_view_qc: None,
87 pending_epoch: None,
88 persistence: config.persistence,
89 }
90 }
91
92 pub async fn run(mut self) {
95 if self.state.current_view.as_u64() <= 1 {
96 self.enter_genesis_view();
97 } else {
98 info!(
99 validator = %self.state.validator_id,
100 view = %self.state.current_view,
101 height = %self.state.last_committed_height,
102 "resuming from persisted state"
103 );
104 self.pacemaker.reset_timer();
105 }
106
107 loop {
108 let deadline = self.pacemaker.sleep_until_deadline();
109 tokio::pin!(deadline);
110
111 tokio::select! {
112 Some((sender, msg)) = self.msg_rx.recv() => {
113 if let Err(e) = self.handle_message(sender, msg) {
114 warn!(validator = %self.state.validator_id, error = %e, "error handling message");
115 }
116 }
117 _ = &mut deadline => {
118 self.handle_timeout();
119 }
120 }
121 }
122 }
123
124 fn enter_genesis_view(&mut self) {
125 let genesis_qc = QuorumCertificate {
127 block_hash: BlockHash::GENESIS,
128 view: ViewNumber::GENESIS,
129 aggregate_signature: AggregateSignature::new(
130 self.state.validator_set.validator_count(),
131 ),
132 };
133 self.state.highest_qc = Some(genesis_qc);
134
135 let view = ViewNumber(1);
136 view_protocol::enter_view(
137 &mut self.state,
138 view,
139 ViewEntryTrigger::Genesis,
140 self.network.as_ref(),
141 self.signer.as_ref(),
142 );
143 self.pacemaker.reset_timer();
144
145 if self.state.is_leader() {
147 self.state.step = ViewStep::WaitingForStatus;
148 self.try_propose();
150 }
151 }
152
153 fn try_propose(&mut self) {
154 let mut store = self.store.write().unwrap();
155 match view_protocol::propose(
156 &mut self.state,
157 store.as_mut(),
158 self.network.as_ref(),
159 self.app.as_ref(),
160 self.signer.as_ref(),
161 ) {
162 Ok(block) => {
163 drop(store);
164 self.leader_self_vote(block.hash);
166 }
167 Err(e) => {
168 warn!(
169 validator = %self.state.validator_id,
170 error = %e,
171 "failed to propose"
172 );
173 }
174 }
175 }
176
177 fn leader_self_vote(&mut self, block_hash: BlockHash) {
178 let vote_bytes = Vote::signing_bytes(self.state.current_view, &block_hash, VoteType::Vote);
179 let signature = self.signer.sign(&vote_bytes);
180 let vote = Vote {
181 block_hash,
182 view: self.state.current_view,
183 validator: self.state.validator_id,
184 signature,
185 vote_type: VoteType::Vote,
186 };
187 match self
188 .vote_collector
189 .add_vote(&self.state.validator_set, vote)
190 {
191 Ok(result) => {
192 self.handle_equivocation(&result);
193 if let Some(qc) = result.qc {
194 self.on_qc_formed(qc);
195 }
196 }
197 Err(e) => warn!(error = %e, "failed to add self vote"),
198 }
199 }
200
201 fn verify_message(&self, msg: &ConsensusMessage) -> bool {
205 let msg_view = match msg {
210 ConsensusMessage::Propose { .. } => None, ConsensusMessage::VoteMsg(v) | ConsensusMessage::Vote2Msg(v) => Some(v.view),
212 ConsensusMessage::Prepare { certificate, .. } => Some(certificate.view),
213 ConsensusMessage::Wish { target_view, .. } => Some(*target_view),
214 ConsensusMessage::TimeoutCert(tc) => Some(ViewNumber(tc.view.as_u64() + 1)),
215 ConsensusMessage::StatusCert { .. } => None,
216 };
217 if let Some(v) = msg_view
218 && v < self.state.current_view
219 {
220 return true; }
222
223 let vs = &self.state.validator_set;
224 match msg {
225 ConsensusMessage::Propose {
226 block,
227 justify,
228 signature,
229 ..
230 } => {
231 let proposer = vs.get(block.proposer);
232 let Some(vi) = proposer else {
233 warn!(proposer = %block.proposer, "propose from unknown validator");
234 return false;
235 };
236 let bytes = view_protocol::proposal_signing_bytes(block, justify);
237 if !self.verifier.verify(&vi.public_key, &bytes, signature) {
238 warn!(proposer = %block.proposer, "invalid proposal signature");
239 return false;
240 }
241 if justify.aggregate_signature.count() > 0 {
243 let qc_bytes =
244 Vote::signing_bytes(justify.view, &justify.block_hash, VoteType::Vote);
245 if !self
246 .verifier
247 .verify_aggregate(vs, &qc_bytes, &justify.aggregate_signature)
248 {
249 warn!(proposer = %block.proposer, "invalid justify QC aggregate signature");
250 return false;
251 }
252 }
253 true
254 }
255 ConsensusMessage::VoteMsg(vote) | ConsensusMessage::Vote2Msg(vote) => {
256 let Some(vi) = vs.get(vote.validator) else {
257 warn!(validator = %vote.validator, "vote from unknown validator");
258 return false;
259 };
260 let bytes = Vote::signing_bytes(vote.view, &vote.block_hash, vote.vote_type);
261 if !self
262 .verifier
263 .verify(&vi.public_key, &bytes, &vote.signature)
264 {
265 warn!(validator = %vote.validator, "invalid vote signature");
266 return false;
267 }
268 true
269 }
270 ConsensusMessage::Prepare {
271 certificate,
272 signature,
273 } => {
274 let leader = vs.leader_for_view(certificate.view);
276 let bytes = view_protocol::prepare_signing_bytes(certificate);
277 if !self.verifier.verify(&leader.public_key, &bytes, signature) {
278 warn!(view = %certificate.view, "invalid prepare signature");
279 return false;
280 }
281 let qc_bytes =
283 Vote::signing_bytes(certificate.view, &certificate.block_hash, VoteType::Vote);
284 if !self
285 .verifier
286 .verify_aggregate(vs, &qc_bytes, &certificate.aggregate_signature)
287 {
288 warn!(view = %certificate.view, "invalid QC aggregate signature");
289 return false;
290 }
291 true
292 }
293 ConsensusMessage::Wish {
294 target_view,
295 validator,
296 signature,
297 ..
298 } => {
299 let Some(vi) = vs.get(*validator) else {
300 warn!(validator = %validator, "wish from unknown validator");
301 return false;
302 };
303 let bytes = crate::pacemaker::wish_signing_bytes(*target_view);
304 if !self.verifier.verify(&vi.public_key, &bytes, signature) {
305 warn!(validator = %validator, "invalid wish signature");
306 return false;
307 }
308 true
309 }
310 ConsensusMessage::TimeoutCert(tc) => {
311 let bytes = crate::pacemaker::wish_signing_bytes(ViewNumber(tc.view.as_u64() + 1));
316 if !self
317 .verifier
318 .verify_aggregate(vs, &bytes, &tc.aggregate_signature)
319 {
320 warn!(view = %tc.view, "invalid TC aggregate signature");
321 return false;
322 }
323 true
324 }
325 ConsensusMessage::StatusCert {
326 locked_qc,
327 validator,
328 signature,
329 } => {
330 let Some(vi) = vs.get(*validator) else {
331 warn!(validator = %validator, "status from unknown validator");
332 return false;
333 };
334 let bytes = view_protocol::status_signing_bytes(self.state.current_view, locked_qc);
335 if !self.verifier.verify(&vi.public_key, &bytes, signature) {
336 warn!(validator = %validator, "invalid status signature");
337 return false;
338 }
339 true
340 }
341 }
342 }
343
344 fn handle_message(&mut self, _sender: ValidatorId, msg: ConsensusMessage) -> Result<()> {
345 if !self.verify_message(&msg) {
346 return Ok(());
347 }
348
349 match msg {
350 ConsensusMessage::Propose {
351 block,
352 justify,
353 double_cert,
354 signature: _,
355 } => {
356 let block = *block;
357 let justify = *justify;
358 let double_cert = double_cert.map(|dc| *dc);
359
360 if block.view > self.state.current_view {
362 if let Some(ref dc) = double_cert {
363 if dc.inner_qc.block_hash != dc.outer_qc.block_hash {
366 warn!("double cert inner/outer block_hash mismatch");
367 return Ok(());
368 }
369 let quorum = self.state.validator_set.quorum_threshold() as usize;
371 if dc.inner_qc.aggregate_signature.count() < quorum {
372 warn!("double cert inner QC insufficient signers");
373 return Ok(());
374 }
375 if dc.outer_qc.aggregate_signature.count() < quorum {
376 warn!("double cert outer QC insufficient signers");
377 return Ok(());
378 }
379 let inner_bytes = Vote::signing_bytes(
381 dc.inner_qc.view,
382 &dc.inner_qc.block_hash,
383 VoteType::Vote,
384 );
385 if !self.verifier.verify_aggregate(
386 &self.state.validator_set,
387 &inner_bytes,
388 &dc.inner_qc.aggregate_signature,
389 ) {
390 warn!("double cert inner QC signature invalid");
391 return Ok(());
392 }
393 let outer_bytes = Vote::signing_bytes(
395 dc.outer_qc.view,
396 &dc.outer_qc.block_hash,
397 VoteType::Vote2,
398 );
399 if !self.verifier.verify_aggregate(
400 &self.state.validator_set,
401 &outer_bytes,
402 &dc.outer_qc.aggregate_signature,
403 ) {
404 warn!("double cert outer QC signature invalid");
405 return Ok(());
406 }
407
408 let store = self.store.read().unwrap();
410 match try_commit(
411 dc,
412 store.as_ref(),
413 self.app.as_ref(),
414 &mut self.state.last_committed_height,
415 &self.state.current_epoch,
416 ) {
417 Ok(result) => {
418 if result.pending_epoch.is_some() {
419 self.pending_epoch = result.pending_epoch;
420 }
421 drop(store);
422 {
424 let mut s = self.store.write().unwrap();
425 for block in &result.committed_blocks {
426 s.put_commit_qc(block.height, result.commit_qc.clone());
427 }
428 s.flush();
429 }
430 }
431 Err(e) => {
432 warn!(error = %e, "try_commit failed during fast-forward");
433 drop(store);
434 }
435 }
436 self.state.highest_double_cert = Some(dc.clone());
437 self.advance_view_to(block.view, ViewEntryTrigger::DoubleCert(dc.clone()));
438 } else {
439 return Ok(());
440 }
441 } else if block.view < self.state.current_view {
442 if block.height > self.state.last_committed_height {
448 let expected = hotmint_crypto::hash_block(&block);
450 if block.hash == expected {
451 let mut store = self.store.write().unwrap();
452 store.put_block(block);
453 }
454 }
455 return Ok(());
456 }
457
458 let mut store = self.store.write().unwrap();
459 let maybe_pending = view_protocol::on_proposal(
460 &mut self.state,
461 view_protocol::ProposalData {
462 block,
463 justify,
464 double_cert,
465 },
466 store.as_mut(),
467 self.network.as_ref(),
468 self.app.as_ref(),
469 self.signer.as_ref(),
470 )
471 .c(d!())?;
472 drop(store);
473
474 if let Some(epoch) = maybe_pending {
475 self.pending_epoch = Some(epoch);
476 }
477 }
478
479 ConsensusMessage::VoteMsg(vote) => {
480 if vote.view != self.state.current_view {
481 return Ok(());
482 }
483 if !self.state.is_leader() {
484 return Ok(());
485 }
486 if vote.vote_type != VoteType::Vote {
487 return Ok(());
488 }
489
490 let result = self
491 .vote_collector
492 .add_vote(&self.state.validator_set, vote)
493 .c(d!())?;
494 self.handle_equivocation(&result);
495 if let Some(qc) = result.qc {
496 self.on_qc_formed(qc);
497 }
498 }
499
500 ConsensusMessage::Prepare {
501 certificate,
502 signature: _,
503 } => {
504 if certificate.view < self.state.current_view {
505 return Ok(());
506 }
507 if certificate.view == self.state.current_view {
508 view_protocol::on_prepare(
509 &mut self.state,
510 certificate,
511 self.network.as_ref(),
512 self.signer.as_ref(),
513 );
514 }
515 }
516
517 ConsensusMessage::Vote2Msg(vote) => {
518 if vote.vote_type != VoteType::Vote2 {
519 return Ok(());
520 }
521 if vote.view != self.state.current_view {
522 return Ok(());
523 }
524 let result = self
525 .vote_collector
526 .add_vote(&self.state.validator_set, vote)
527 .c(d!())?;
528 self.handle_equivocation(&result);
529 if let Some(outer_qc) = result.qc {
530 self.on_double_cert_formed(outer_qc);
531 }
532 }
533
534 ConsensusMessage::Wish {
535 target_view,
536 validator,
537 highest_qc,
538 signature,
539 } => {
540 if let Some(ref qc) = highest_qc
542 && qc.aggregate_signature.count() > 0
543 {
544 let qc_bytes = Vote::signing_bytes(qc.view, &qc.block_hash, VoteType::Vote);
545 if !self.verifier.verify_aggregate(
546 &self.state.validator_set,
547 &qc_bytes,
548 &qc.aggregate_signature,
549 ) {
550 warn!(validator = %validator, "wish carries invalid highest_qc");
551 return Ok(());
552 }
553 }
554
555 if let Some(tc) = self.pacemaker.add_wish(
556 &self.state.validator_set,
557 target_view,
558 validator,
559 highest_qc,
560 signature,
561 ) {
562 info!(
563 validator = %self.state.validator_id,
564 view = %tc.view,
565 "TC formed, advancing view"
566 );
567 self.network
568 .broadcast(ConsensusMessage::TimeoutCert(tc.clone()));
569 self.advance_view(ViewEntryTrigger::TimeoutCert(tc));
570 }
571 }
572
573 ConsensusMessage::TimeoutCert(tc) => {
574 if self.pacemaker.should_relay_tc(&tc) {
575 self.network
576 .broadcast(ConsensusMessage::TimeoutCert(tc.clone()));
577 }
578 let new_view = ViewNumber(tc.view.as_u64() + 1);
579 if new_view > self.state.current_view {
580 self.advance_view(ViewEntryTrigger::TimeoutCert(tc));
581 }
582 }
583
584 ConsensusMessage::StatusCert {
585 locked_qc,
586 validator,
587 signature: _,
588 } => {
589 if self.state.is_leader() && self.state.step == ViewStep::WaitingForStatus {
590 if let Some(ref qc) = locked_qc {
591 self.state.update_highest_qc(qc);
592 }
593 self.status_senders.insert(validator);
594 let status_power: u64 = self
595 .status_senders
596 .iter()
597 .map(|v| self.state.validator_set.power_of(*v))
598 .sum();
599 let total_power =
601 status_power + self.state.validator_set.power_of(self.state.validator_id);
602 if total_power >= self.state.validator_set.quorum_threshold() {
603 self.try_propose();
604 }
605 }
606 }
607 }
608 Ok(())
609 }
610
611 fn handle_equivocation(&self, result: &crate::vote_collector::VoteResult) {
612 if let Some(ref proof) = result.equivocation {
613 warn!(
614 validator = %proof.validator,
615 view = %proof.view,
616 "equivocation detected!"
617 );
618 if let Err(e) = self.app.on_evidence(proof) {
619 warn!(error = %e, "on_evidence callback failed");
620 }
621 }
622 }
623
624 fn on_qc_formed(&mut self, qc: QuorumCertificate) {
625 self.current_view_qc = Some(qc.clone());
627
628 view_protocol::on_votes_collected(
629 &mut self.state,
630 qc.clone(),
631 self.network.as_ref(),
632 self.signer.as_ref(),
633 );
634
635 let vote_bytes =
637 Vote::signing_bytes(self.state.current_view, &qc.block_hash, VoteType::Vote2);
638 let signature = self.signer.sign(&vote_bytes);
639 let vote = Vote {
640 block_hash: qc.block_hash,
641 view: self.state.current_view,
642 validator: self.state.validator_id,
643 signature,
644 vote_type: VoteType::Vote2,
645 };
646
647 self.state.update_locked_qc(&qc);
649
650 let next_leader_id =
651 leader::next_leader(&self.state.validator_set, self.state.current_view);
652 if next_leader_id == self.state.validator_id {
653 match self
655 .vote_collector
656 .add_vote(&self.state.validator_set, vote)
657 {
658 Ok(result) => {
659 self.handle_equivocation(&result);
660 if let Some(outer_qc) = result.qc {
661 self.on_double_cert_formed(outer_qc);
662 }
663 }
664 Err(e) => warn!(error = %e, "failed to add self vote2"),
665 }
666 } else {
667 self.network
668 .send_to(next_leader_id, ConsensusMessage::Vote2Msg(vote));
669 }
670 }
671
672 fn on_double_cert_formed(&mut self, outer_qc: QuorumCertificate) {
673 let inner_qc = match self.current_view_qc.take() {
675 Some(qc) if qc.block_hash == outer_qc.block_hash => qc,
676 _ => {
677 match &self.state.locked_qc {
679 Some(qc) if qc.block_hash == outer_qc.block_hash => qc.clone(),
680 _ => match &self.state.highest_qc {
681 Some(qc) if qc.block_hash == outer_qc.block_hash => qc.clone(),
682 _ => {
683 warn!(
684 validator = %self.state.validator_id,
685 "double cert formed but can't find matching inner QC"
686 );
687 return;
688 }
689 },
690 }
691 }
692 };
693
694 let dc = DoubleCertificate { inner_qc, outer_qc };
695
696 info!(
697 validator = %self.state.validator_id,
698 view = %self.state.current_view,
699 hash = %dc.inner_qc.block_hash,
700 "double certificate formed, committing"
701 );
702
703 {
705 let store = self.store.read().unwrap();
706 match try_commit(
707 &dc,
708 store.as_ref(),
709 self.app.as_ref(),
710 &mut self.state.last_committed_height,
711 &self.state.current_epoch,
712 ) {
713 Ok(result) => {
714 if result.pending_epoch.is_some() {
715 self.pending_epoch = result.pending_epoch;
716 }
717 drop(store);
718 {
719 let mut s = self.store.write().unwrap();
720 for block in &result.committed_blocks {
721 s.put_commit_qc(block.height, result.commit_qc.clone());
722 }
723 s.flush();
724 }
725 }
726 Err(e) => {
727 warn!(error = %e, "try_commit failed in double cert handler");
728 drop(store);
729 }
730 }
731 }
732
733 self.state.highest_double_cert = Some(dc.clone());
734
735 self.advance_view(ViewEntryTrigger::DoubleCert(dc));
737 }
738
739 fn handle_timeout(&mut self) {
740 info!(
741 validator = %self.state.validator_id,
742 view = %self.state.current_view,
743 "view timeout, sending wish"
744 );
745
746 let wish = self.pacemaker.build_wish(
747 self.state.current_view,
748 self.state.validator_id,
749 self.state.highest_qc.clone(),
750 self.signer.as_ref(),
751 );
752
753 self.network.broadcast(wish.clone());
754
755 if let ConsensusMessage::Wish {
757 target_view,
758 validator,
759 highest_qc,
760 signature,
761 } = wish
762 && let Some(tc) = self.pacemaker.add_wish(
763 &self.state.validator_set,
764 target_view,
765 validator,
766 highest_qc,
767 signature,
768 )
769 {
770 self.network
771 .broadcast(ConsensusMessage::TimeoutCert(tc.clone()));
772 self.advance_view(ViewEntryTrigger::TimeoutCert(tc));
773 return;
774 }
775
776 self.pacemaker.on_timeout();
778 }
779
780 fn persist_state(&mut self) {
781 if let Some(p) = self.persistence.as_mut() {
782 p.save_current_view(self.state.current_view);
783 if let Some(ref qc) = self.state.locked_qc {
784 p.save_locked_qc(qc);
785 }
786 if let Some(ref qc) = self.state.highest_qc {
787 p.save_highest_qc(qc);
788 }
789 p.save_last_committed_height(self.state.last_committed_height);
790 p.save_current_epoch(&self.state.current_epoch);
791 p.flush();
792 }
793 }
794
795 fn advance_view(&mut self, trigger: ViewEntryTrigger) {
796 let new_view = match &trigger {
797 ViewEntryTrigger::DoubleCert(_) => self.state.current_view.next(),
798 ViewEntryTrigger::TimeoutCert(tc) => ViewNumber(tc.view.as_u64() + 1),
799 ViewEntryTrigger::Genesis => ViewNumber(1),
800 };
801 self.advance_view_to(new_view, trigger);
802 }
803
804 fn advance_view_to(&mut self, new_view: ViewNumber, trigger: ViewEntryTrigger) {
805 if new_view <= self.state.current_view {
806 return;
807 }
808
809 let is_progress = matches!(&trigger, ViewEntryTrigger::DoubleCert(_));
811
812 self.vote_collector.clear_view(self.state.current_view);
813 self.vote_collector.prune_before(self.state.current_view);
814 self.pacemaker.clear_view(self.state.current_view);
815 self.status_senders.clear();
816 self.current_view_qc = None;
817
818 if let Some(ref epoch) = self.pending_epoch
822 && new_view >= epoch.start_view
823 {
824 let new_epoch = self.pending_epoch.take().unwrap();
825 info!(
826 validator = %self.state.validator_id,
827 old_epoch = %self.state.current_epoch.number,
828 new_epoch = %new_epoch.number,
829 start_view = %new_epoch.start_view,
830 validators = new_epoch.validator_set.validator_count(),
831 "epoch transition"
832 );
833 self.state.validator_set = new_epoch.validator_set.clone();
834 self.state.current_epoch = new_epoch;
835 self.network.on_epoch_change(&self.state.validator_set);
837 self.vote_collector = VoteCollector::new();
839 self.pacemaker = Pacemaker::with_config(self.pacemaker_config.clone());
840 }
841
842 view_protocol::enter_view(
843 &mut self.state,
844 new_view,
845 trigger,
846 self.network.as_ref(),
847 self.signer.as_ref(),
848 );
849
850 if is_progress {
851 self.pacemaker.reset_on_progress();
852 } else {
853 self.pacemaker.reset_timer();
854 }
855
856 self.persist_state();
857
858 if self.state.is_leader() && self.state.step == ViewStep::WaitingForStatus {
864 self.try_propose();
865 }
866 }
867}