Skip to main content

hotmint_consensus/
engine.rs

1use ruc::*;
2
3use std::collections::HashSet;
4use std::sync::{Arc, RwLock};
5
6use crate::application::Application;
7use crate::commit::try_commit;
8use crate::leader;
9use crate::network::NetworkSink;
10use crate::pacemaker::{Pacemaker, PacemakerConfig};
11use crate::state::{ConsensusState, ViewStep};
12use crate::store::BlockStore;
13use crate::view_protocol::{self, ViewEntryTrigger};
14use crate::vote_collector::VoteCollector;
15
16use hotmint_types::epoch::Epoch;
17use hotmint_types::vote::VoteType;
18use hotmint_types::*;
19use tokio::sync::mpsc;
20use tracing::{info, warn};
21
22/// Shared block store type used by the engine, RPC, and sync responder.
23pub type SharedBlockStore = Arc<RwLock<Box<dyn BlockStore>>>;
24
25/// Trait for persisting critical consensus state across restarts.
26pub trait StatePersistence: Send {
27    fn save_current_view(&mut self, view: ViewNumber);
28    fn save_locked_qc(&mut self, qc: &QuorumCertificate);
29    fn save_highest_qc(&mut self, qc: &QuorumCertificate);
30    fn save_last_committed_height(&mut self, height: Height);
31    fn save_current_epoch(&mut self, epoch: &Epoch);
32    fn flush(&self);
33}
34
35pub struct ConsensusEngine {
36    state: ConsensusState,
37    store: SharedBlockStore,
38    network: Box<dyn NetworkSink>,
39    app: Box<dyn Application>,
40    signer: Box<dyn Signer>,
41    verifier: Box<dyn Verifier>,
42    vote_collector: VoteCollector,
43    pacemaker: Pacemaker,
44    pacemaker_config: PacemakerConfig,
45    msg_rx: mpsc::Receiver<(ValidatorId, ConsensusMessage)>,
46    /// Collected unique status cert senders (for leader, per view)
47    status_senders: HashSet<ValidatorId>,
48    /// The QC formed in this view's first voting round (used to build DoubleCert)
49    current_view_qc: Option<QuorumCertificate>,
50    /// Pending epoch transition (set by try_commit, applied in advance_view_to)
51    pending_epoch: Option<Epoch>,
52    /// Optional state persistence (for crash recovery).
53    persistence: Option<Box<dyn StatePersistence>>,
54}
55
56/// Configuration for ConsensusEngine.
57pub struct EngineConfig {
58    pub verifier: Box<dyn Verifier>,
59    pub pacemaker: Option<PacemakerConfig>,
60    pub persistence: Option<Box<dyn StatePersistence>>,
61}
62
63impl ConsensusEngine {
64    pub fn new(
65        state: ConsensusState,
66        store: SharedBlockStore,
67        network: Box<dyn NetworkSink>,
68        app: Box<dyn Application>,
69        signer: Box<dyn Signer>,
70        msg_rx: mpsc::Receiver<(ValidatorId, ConsensusMessage)>,
71        config: EngineConfig,
72    ) -> Self {
73        let pc = config.pacemaker.unwrap_or_default();
74        Self {
75            state,
76            store,
77            network,
78            app,
79            signer,
80            verifier: config.verifier,
81            vote_collector: VoteCollector::new(),
82            pacemaker: Pacemaker::with_config(pc.clone()),
83            pacemaker_config: pc,
84            msg_rx,
85            status_senders: HashSet::new(),
86            current_view_qc: None,
87            pending_epoch: None,
88            persistence: config.persistence,
89        }
90    }
91
92    /// Bootstrap and start the event loop.
93    /// If persisted state was restored (current_view > 1), skip genesis bootstrap.
94    pub async fn run(mut self) {
95        if self.state.current_view.as_u64() <= 1 {
96            self.enter_genesis_view();
97        } else {
98            info!(
99                validator = %self.state.validator_id,
100                view = %self.state.current_view,
101                height = %self.state.last_committed_height,
102                "resuming from persisted state"
103            );
104            self.pacemaker.reset_timer();
105        }
106
107        loop {
108            let deadline = self.pacemaker.sleep_until_deadline();
109            tokio::pin!(deadline);
110
111            tokio::select! {
112                Some((sender, msg)) = self.msg_rx.recv() => {
113                    if let Err(e) = self.handle_message(sender, msg) {
114                        warn!(validator = %self.state.validator_id, error = %e, "error handling message");
115                    }
116                }
117                _ = &mut deadline => {
118                    self.handle_timeout();
119                }
120            }
121        }
122    }
123
124    fn enter_genesis_view(&mut self) {
125        // Create a synthetic genesis QC so the first leader can propose
126        let genesis_qc = QuorumCertificate {
127            block_hash: BlockHash::GENESIS,
128            view: ViewNumber::GENESIS,
129            aggregate_signature: AggregateSignature::new(
130                self.state.validator_set.validator_count(),
131            ),
132        };
133        self.state.highest_qc = Some(genesis_qc);
134
135        let view = ViewNumber(1);
136        view_protocol::enter_view(
137            &mut self.state,
138            view,
139            ViewEntryTrigger::Genesis,
140            self.network.as_ref(),
141            self.signer.as_ref(),
142        );
143        self.pacemaker.reset_timer();
144
145        // If leader of genesis view, propose immediately
146        if self.state.is_leader() {
147            self.state.step = ViewStep::WaitingForStatus;
148            // In genesis, skip status wait — propose directly
149            self.try_propose();
150        }
151    }
152
153    fn try_propose(&mut self) {
154        let mut store = self.store.write().unwrap();
155        match view_protocol::propose(
156            &mut self.state,
157            store.as_mut(),
158            self.network.as_ref(),
159            self.app.as_ref(),
160            self.signer.as_ref(),
161        ) {
162            Ok(block) => {
163                drop(store);
164                // Leader votes for its own block
165                self.leader_self_vote(block.hash);
166            }
167            Err(e) => {
168                warn!(
169                    validator = %self.state.validator_id,
170                    error = %e,
171                    "failed to propose"
172                );
173            }
174        }
175    }
176
177    fn leader_self_vote(&mut self, block_hash: BlockHash) {
178        let vote_bytes = Vote::signing_bytes(self.state.current_view, &block_hash, VoteType::Vote);
179        let signature = self.signer.sign(&vote_bytes);
180        let vote = Vote {
181            block_hash,
182            view: self.state.current_view,
183            validator: self.state.validator_id,
184            signature,
185            vote_type: VoteType::Vote,
186        };
187        match self
188            .vote_collector
189            .add_vote(&self.state.validator_set, vote)
190        {
191            Ok(result) => {
192                self.handle_equivocation(&result);
193                if let Some(qc) = result.qc {
194                    self.on_qc_formed(qc);
195                }
196            }
197            Err(e) => warn!(error = %e, "failed to add self vote"),
198        }
199    }
200
201    /// Verify the cryptographic signature on an inbound consensus message.
202    /// Returns false (and logs a warning) if verification fails.
203    /// Messages from past views are skipped (they'll be dropped by handle_message anyway).
204    fn verify_message(&self, msg: &ConsensusMessage) -> bool {
205        // Skip verification for non-Propose past-view messages — these may have
206        // been signed by a previous epoch's validator set. They'll be dropped by
207        // view checks. Propose messages are always verified because they may still
208        // be stored (for chain continuity in fast-forward).
209        let msg_view = match msg {
210            ConsensusMessage::Propose { .. } => None, // always verify proposals
211            ConsensusMessage::VoteMsg(v) | ConsensusMessage::Vote2Msg(v) => Some(v.view),
212            ConsensusMessage::Prepare { certificate, .. } => Some(certificate.view),
213            ConsensusMessage::Wish { target_view, .. } => Some(*target_view),
214            ConsensusMessage::TimeoutCert(tc) => Some(ViewNumber(tc.view.as_u64() + 1)),
215            ConsensusMessage::StatusCert { .. } => None,
216        };
217        if let Some(v) = msg_view
218            && v < self.state.current_view
219        {
220            return true; // will be dropped by handler
221        }
222
223        let vs = &self.state.validator_set;
224        match msg {
225            ConsensusMessage::Propose {
226                block,
227                justify,
228                signature,
229                ..
230            } => {
231                let proposer = vs.get(block.proposer);
232                let Some(vi) = proposer else {
233                    warn!(proposer = %block.proposer, "propose from unknown validator");
234                    return false;
235                };
236                let bytes = view_protocol::proposal_signing_bytes(block, justify);
237                if !self.verifier.verify(&vi.public_key, &bytes, signature) {
238                    warn!(proposer = %block.proposer, "invalid proposal signature");
239                    return false;
240                }
241                // Verify justify QC aggregate signature (skip genesis QC which has no signers)
242                if justify.aggregate_signature.count() > 0 {
243                    let qc_bytes =
244                        Vote::signing_bytes(justify.view, &justify.block_hash, VoteType::Vote);
245                    if !self
246                        .verifier
247                        .verify_aggregate(vs, &qc_bytes, &justify.aggregate_signature)
248                    {
249                        warn!(proposer = %block.proposer, "invalid justify QC aggregate signature");
250                        return false;
251                    }
252                }
253                true
254            }
255            ConsensusMessage::VoteMsg(vote) | ConsensusMessage::Vote2Msg(vote) => {
256                let Some(vi) = vs.get(vote.validator) else {
257                    warn!(validator = %vote.validator, "vote from unknown validator");
258                    return false;
259                };
260                let bytes = Vote::signing_bytes(vote.view, &vote.block_hash, vote.vote_type);
261                if !self
262                    .verifier
263                    .verify(&vi.public_key, &bytes, &vote.signature)
264                {
265                    warn!(validator = %vote.validator, "invalid vote signature");
266                    return false;
267                }
268                true
269            }
270            ConsensusMessage::Prepare {
271                certificate,
272                signature,
273            } => {
274                // Verify the leader's signature on the prepare message
275                let leader = vs.leader_for_view(certificate.view);
276                let bytes = view_protocol::prepare_signing_bytes(certificate);
277                if !self.verifier.verify(&leader.public_key, &bytes, signature) {
278                    warn!(view = %certificate.view, "invalid prepare signature");
279                    return false;
280                }
281                // Also verify the QC's aggregate signature
282                let qc_bytes =
283                    Vote::signing_bytes(certificate.view, &certificate.block_hash, VoteType::Vote);
284                if !self
285                    .verifier
286                    .verify_aggregate(vs, &qc_bytes, &certificate.aggregate_signature)
287                {
288                    warn!(view = %certificate.view, "invalid QC aggregate signature");
289                    return false;
290                }
291                true
292            }
293            ConsensusMessage::Wish {
294                target_view,
295                validator,
296                signature,
297                ..
298            } => {
299                let Some(vi) = vs.get(*validator) else {
300                    warn!(validator = %validator, "wish from unknown validator");
301                    return false;
302                };
303                let bytes = crate::pacemaker::wish_signing_bytes(*target_view);
304                if !self.verifier.verify(&vi.public_key, &bytes, signature) {
305                    warn!(validator = %validator, "invalid wish signature");
306                    return false;
307                }
308                true
309            }
310            ConsensusMessage::TimeoutCert(tc) => {
311                // TC aggregate signature: individual wishes bind highest_qc,
312                // but aggregate verification can't split per-validator.
313                // Verify with None (base signing bytes) — individual wish
314                // verification at add_wish provides the full binding.
315                let bytes = crate::pacemaker::wish_signing_bytes(ViewNumber(tc.view.as_u64() + 1));
316                if !self
317                    .verifier
318                    .verify_aggregate(vs, &bytes, &tc.aggregate_signature)
319                {
320                    warn!(view = %tc.view, "invalid TC aggregate signature");
321                    return false;
322                }
323                true
324            }
325            ConsensusMessage::StatusCert {
326                locked_qc,
327                validator,
328                signature,
329            } => {
330                let Some(vi) = vs.get(*validator) else {
331                    warn!(validator = %validator, "status from unknown validator");
332                    return false;
333                };
334                let bytes = view_protocol::status_signing_bytes(self.state.current_view, locked_qc);
335                if !self.verifier.verify(&vi.public_key, &bytes, signature) {
336                    warn!(validator = %validator, "invalid status signature");
337                    return false;
338                }
339                true
340            }
341        }
342    }
343
344    fn handle_message(&mut self, _sender: ValidatorId, msg: ConsensusMessage) -> Result<()> {
345        if !self.verify_message(&msg) {
346            return Ok(());
347        }
348
349        match msg {
350            ConsensusMessage::Propose {
351                block,
352                justify,
353                double_cert,
354                signature: _,
355            } => {
356                let block = *block;
357                let justify = *justify;
358                let double_cert = double_cert.map(|dc| *dc);
359
360                // If proposal is from a future view, advance to it first
361                if block.view > self.state.current_view {
362                    if let Some(ref dc) = double_cert {
363                        // Validate DoubleCert comprehensively:
364                        // 1. Inner and outer QC must reference same block
365                        if dc.inner_qc.block_hash != dc.outer_qc.block_hash {
366                            warn!("double cert inner/outer block_hash mismatch");
367                            return Ok(());
368                        }
369                        // 2. Both QCs must have quorum-level signer count
370                        let quorum = self.state.validator_set.quorum_threshold() as usize;
371                        if dc.inner_qc.aggregate_signature.count() < quorum {
372                            warn!("double cert inner QC insufficient signers");
373                            return Ok(());
374                        }
375                        if dc.outer_qc.aggregate_signature.count() < quorum {
376                            warn!("double cert outer QC insufficient signers");
377                            return Ok(());
378                        }
379                        // 3. Verify inner QC aggregate signature (Vote1)
380                        let inner_bytes = Vote::signing_bytes(
381                            dc.inner_qc.view,
382                            &dc.inner_qc.block_hash,
383                            VoteType::Vote,
384                        );
385                        if !self.verifier.verify_aggregate(
386                            &self.state.validator_set,
387                            &inner_bytes,
388                            &dc.inner_qc.aggregate_signature,
389                        ) {
390                            warn!("double cert inner QC signature invalid");
391                            return Ok(());
392                        }
393                        // 4. Verify outer QC aggregate signature (Vote2)
394                        let outer_bytes = Vote::signing_bytes(
395                            dc.outer_qc.view,
396                            &dc.outer_qc.block_hash,
397                            VoteType::Vote2,
398                        );
399                        if !self.verifier.verify_aggregate(
400                            &self.state.validator_set,
401                            &outer_bytes,
402                            &dc.outer_qc.aggregate_signature,
403                        ) {
404                            warn!("double cert outer QC signature invalid");
405                            return Ok(());
406                        }
407
408                        // Fast-forward via double cert
409                        let store = self.store.read().unwrap();
410                        match try_commit(
411                            dc,
412                            store.as_ref(),
413                            self.app.as_ref(),
414                            &mut self.state.last_committed_height,
415                            &self.state.current_epoch,
416                        ) {
417                            Ok(result) => {
418                                if result.pending_epoch.is_some() {
419                                    self.pending_epoch = result.pending_epoch;
420                                }
421                                drop(store);
422                                // Store commit QC for sync protocol + flush
423                                {
424                                    let mut s = self.store.write().unwrap();
425                                    for block in &result.committed_blocks {
426                                        s.put_commit_qc(block.height, result.commit_qc.clone());
427                                    }
428                                    s.flush();
429                                }
430                            }
431                            Err(e) => {
432                                warn!(error = %e, "try_commit failed during fast-forward");
433                                drop(store);
434                            }
435                        }
436                        self.state.highest_double_cert = Some(dc.clone());
437                        self.advance_view_to(block.view, ViewEntryTrigger::DoubleCert(dc.clone()));
438                    } else {
439                        return Ok(());
440                    }
441                } else if block.view < self.state.current_view {
442                    // Still store blocks from past views if we haven't committed
443                    // that height yet. This handles the case where fast-forward
444                    // advanced our view but we missed storing the block from the
445                    // earlier proposal. Without this, chain commits that walk
446                    // the parent chain would fail with "block not found".
447                    if block.height > self.state.last_committed_height {
448                        // Verify block hash before storing past-view blocks
449                        let expected = hotmint_crypto::hash_block(&block);
450                        if block.hash == expected {
451                            let mut store = self.store.write().unwrap();
452                            store.put_block(block);
453                        }
454                    }
455                    return Ok(());
456                }
457
458                let mut store = self.store.write().unwrap();
459                let maybe_pending = view_protocol::on_proposal(
460                    &mut self.state,
461                    view_protocol::ProposalData {
462                        block,
463                        justify,
464                        double_cert,
465                    },
466                    store.as_mut(),
467                    self.network.as_ref(),
468                    self.app.as_ref(),
469                    self.signer.as_ref(),
470                )
471                .c(d!())?;
472                drop(store);
473
474                if let Some(epoch) = maybe_pending {
475                    self.pending_epoch = Some(epoch);
476                }
477            }
478
479            ConsensusMessage::VoteMsg(vote) => {
480                if vote.view != self.state.current_view {
481                    return Ok(());
482                }
483                if !self.state.is_leader() {
484                    return Ok(());
485                }
486                if vote.vote_type != VoteType::Vote {
487                    return Ok(());
488                }
489
490                let result = self
491                    .vote_collector
492                    .add_vote(&self.state.validator_set, vote)
493                    .c(d!())?;
494                self.handle_equivocation(&result);
495                if let Some(qc) = result.qc {
496                    self.on_qc_formed(qc);
497                }
498            }
499
500            ConsensusMessage::Prepare {
501                certificate,
502                signature: _,
503            } => {
504                if certificate.view < self.state.current_view {
505                    return Ok(());
506                }
507                if certificate.view == self.state.current_view {
508                    view_protocol::on_prepare(
509                        &mut self.state,
510                        certificate,
511                        self.network.as_ref(),
512                        self.signer.as_ref(),
513                    );
514                }
515            }
516
517            ConsensusMessage::Vote2Msg(vote) => {
518                if vote.vote_type != VoteType::Vote2 {
519                    return Ok(());
520                }
521                if vote.view != self.state.current_view {
522                    return Ok(());
523                }
524                let result = self
525                    .vote_collector
526                    .add_vote(&self.state.validator_set, vote)
527                    .c(d!())?;
528                self.handle_equivocation(&result);
529                if let Some(outer_qc) = result.qc {
530                    self.on_double_cert_formed(outer_qc);
531                }
532            }
533
534            ConsensusMessage::Wish {
535                target_view,
536                validator,
537                highest_qc,
538                signature,
539            } => {
540                // Validate carried highest_qc (C4 mitigation)
541                if let Some(ref qc) = highest_qc
542                    && qc.aggregate_signature.count() > 0
543                {
544                    let qc_bytes = Vote::signing_bytes(qc.view, &qc.block_hash, VoteType::Vote);
545                    if !self.verifier.verify_aggregate(
546                        &self.state.validator_set,
547                        &qc_bytes,
548                        &qc.aggregate_signature,
549                    ) {
550                        warn!(validator = %validator, "wish carries invalid highest_qc");
551                        return Ok(());
552                    }
553                }
554
555                if let Some(tc) = self.pacemaker.add_wish(
556                    &self.state.validator_set,
557                    target_view,
558                    validator,
559                    highest_qc,
560                    signature,
561                ) {
562                    info!(
563                        validator = %self.state.validator_id,
564                        view = %tc.view,
565                        "TC formed, advancing view"
566                    );
567                    self.network
568                        .broadcast(ConsensusMessage::TimeoutCert(tc.clone()));
569                    self.advance_view(ViewEntryTrigger::TimeoutCert(tc));
570                }
571            }
572
573            ConsensusMessage::TimeoutCert(tc) => {
574                if self.pacemaker.should_relay_tc(&tc) {
575                    self.network
576                        .broadcast(ConsensusMessage::TimeoutCert(tc.clone()));
577                }
578                let new_view = ViewNumber(tc.view.as_u64() + 1);
579                if new_view > self.state.current_view {
580                    self.advance_view(ViewEntryTrigger::TimeoutCert(tc));
581                }
582            }
583
584            ConsensusMessage::StatusCert {
585                locked_qc,
586                validator,
587                signature: _,
588            } => {
589                if self.state.is_leader() && self.state.step == ViewStep::WaitingForStatus {
590                    if let Some(ref qc) = locked_qc {
591                        self.state.update_highest_qc(qc);
592                    }
593                    self.status_senders.insert(validator);
594                    let status_power: u64 = self
595                        .status_senders
596                        .iter()
597                        .map(|v| self.state.validator_set.power_of(*v))
598                        .sum();
599                    // Leader's own power counts toward quorum
600                    let total_power =
601                        status_power + self.state.validator_set.power_of(self.state.validator_id);
602                    if total_power >= self.state.validator_set.quorum_threshold() {
603                        self.try_propose();
604                    }
605                }
606            }
607        }
608        Ok(())
609    }
610
611    fn handle_equivocation(&self, result: &crate::vote_collector::VoteResult) {
612        if let Some(ref proof) = result.equivocation {
613            warn!(
614                validator = %proof.validator,
615                view = %proof.view,
616                "equivocation detected!"
617            );
618            if let Err(e) = self.app.on_evidence(proof) {
619                warn!(error = %e, "on_evidence callback failed");
620            }
621        }
622    }
623
624    fn on_qc_formed(&mut self, qc: QuorumCertificate) {
625        // Save the QC so we can reliably pair it when forming a DoubleCert
626        self.current_view_qc = Some(qc.clone());
627
628        view_protocol::on_votes_collected(
629            &mut self.state,
630            qc.clone(),
631            self.network.as_ref(),
632            self.signer.as_ref(),
633        );
634
635        // Leader also does vote2 for its own prepare (self-vote for step 5)
636        let vote_bytes =
637            Vote::signing_bytes(self.state.current_view, &qc.block_hash, VoteType::Vote2);
638        let signature = self.signer.sign(&vote_bytes);
639        let vote = Vote {
640            block_hash: qc.block_hash,
641            view: self.state.current_view,
642            validator: self.state.validator_id,
643            signature,
644            vote_type: VoteType::Vote2,
645        };
646
647        // Lock on this QC
648        self.state.update_locked_qc(&qc);
649
650        let next_leader_id =
651            leader::next_leader(&self.state.validator_set, self.state.current_view);
652        if next_leader_id == self.state.validator_id {
653            // We are the next leader, collect vote2 locally
654            match self
655                .vote_collector
656                .add_vote(&self.state.validator_set, vote)
657            {
658                Ok(result) => {
659                    self.handle_equivocation(&result);
660                    if let Some(outer_qc) = result.qc {
661                        self.on_double_cert_formed(outer_qc);
662                    }
663                }
664                Err(e) => warn!(error = %e, "failed to add self vote2"),
665            }
666        } else {
667            self.network
668                .send_to(next_leader_id, ConsensusMessage::Vote2Msg(vote));
669        }
670    }
671
672    fn on_double_cert_formed(&mut self, outer_qc: QuorumCertificate) {
673        // Use the QC we explicitly saved from this view's first voting round
674        let inner_qc = match self.current_view_qc.take() {
675            Some(qc) if qc.block_hash == outer_qc.block_hash => qc,
676            _ => {
677                // Fallback to locked_qc or highest_qc
678                match &self.state.locked_qc {
679                    Some(qc) if qc.block_hash == outer_qc.block_hash => qc.clone(),
680                    _ => match &self.state.highest_qc {
681                        Some(qc) if qc.block_hash == outer_qc.block_hash => qc.clone(),
682                        _ => {
683                            warn!(
684                                validator = %self.state.validator_id,
685                                "double cert formed but can't find matching inner QC"
686                            );
687                            return;
688                        }
689                    },
690                }
691            }
692        };
693
694        let dc = DoubleCertificate { inner_qc, outer_qc };
695
696        info!(
697            validator = %self.state.validator_id,
698            view = %self.state.current_view,
699            hash = %dc.inner_qc.block_hash,
700            "double certificate formed, committing"
701        );
702
703        // Commit
704        {
705            let store = self.store.read().unwrap();
706            match try_commit(
707                &dc,
708                store.as_ref(),
709                self.app.as_ref(),
710                &mut self.state.last_committed_height,
711                &self.state.current_epoch,
712            ) {
713                Ok(result) => {
714                    if result.pending_epoch.is_some() {
715                        self.pending_epoch = result.pending_epoch;
716                    }
717                    drop(store);
718                    {
719                        let mut s = self.store.write().unwrap();
720                        for block in &result.committed_blocks {
721                            s.put_commit_qc(block.height, result.commit_qc.clone());
722                        }
723                        s.flush();
724                    }
725                }
726                Err(e) => {
727                    warn!(error = %e, "try_commit failed in double cert handler");
728                    drop(store);
729                }
730            }
731        }
732
733        self.state.highest_double_cert = Some(dc.clone());
734
735        // Advance to next view — as new leader, include DC in proposal
736        self.advance_view(ViewEntryTrigger::DoubleCert(dc));
737    }
738
739    fn handle_timeout(&mut self) {
740        info!(
741            validator = %self.state.validator_id,
742            view = %self.state.current_view,
743            "view timeout, sending wish"
744        );
745
746        let wish = self.pacemaker.build_wish(
747            self.state.current_view,
748            self.state.validator_id,
749            self.state.highest_qc.clone(),
750            self.signer.as_ref(),
751        );
752
753        self.network.broadcast(wish.clone());
754
755        // Also process our own wish
756        if let ConsensusMessage::Wish {
757            target_view,
758            validator,
759            highest_qc,
760            signature,
761        } = wish
762            && let Some(tc) = self.pacemaker.add_wish(
763                &self.state.validator_set,
764                target_view,
765                validator,
766                highest_qc,
767                signature,
768            )
769        {
770            self.network
771                .broadcast(ConsensusMessage::TimeoutCert(tc.clone()));
772            self.advance_view(ViewEntryTrigger::TimeoutCert(tc));
773            return;
774        }
775
776        // Exponential backoff on repeated timeouts
777        self.pacemaker.on_timeout();
778    }
779
780    fn persist_state(&mut self) {
781        if let Some(p) = self.persistence.as_mut() {
782            p.save_current_view(self.state.current_view);
783            if let Some(ref qc) = self.state.locked_qc {
784                p.save_locked_qc(qc);
785            }
786            if let Some(ref qc) = self.state.highest_qc {
787                p.save_highest_qc(qc);
788            }
789            p.save_last_committed_height(self.state.last_committed_height);
790            p.save_current_epoch(&self.state.current_epoch);
791            p.flush();
792        }
793    }
794
795    fn advance_view(&mut self, trigger: ViewEntryTrigger) {
796        let new_view = match &trigger {
797            ViewEntryTrigger::DoubleCert(_) => self.state.current_view.next(),
798            ViewEntryTrigger::TimeoutCert(tc) => ViewNumber(tc.view.as_u64() + 1),
799            ViewEntryTrigger::Genesis => ViewNumber(1),
800        };
801        self.advance_view_to(new_view, trigger);
802    }
803
804    fn advance_view_to(&mut self, new_view: ViewNumber, trigger: ViewEntryTrigger) {
805        if new_view <= self.state.current_view {
806            return;
807        }
808
809        // Reset backoff on successful progress (DoubleCert path)
810        let is_progress = matches!(&trigger, ViewEntryTrigger::DoubleCert(_));
811
812        self.vote_collector.clear_view(self.state.current_view);
813        self.vote_collector.prune_before(self.state.current_view);
814        self.pacemaker.clear_view(self.state.current_view);
815        self.status_senders.clear();
816        self.current_view_qc = None;
817
818        // Epoch transition: apply pending validator set change when we reach the
819        // epoch's start_view. The start_view is set deterministically (commit_view + 2)
820        // so all honest nodes apply the transition at the same view.
821        if let Some(ref epoch) = self.pending_epoch
822            && new_view >= epoch.start_view
823        {
824            let new_epoch = self.pending_epoch.take().unwrap();
825            info!(
826                validator = %self.state.validator_id,
827                old_epoch = %self.state.current_epoch.number,
828                new_epoch = %new_epoch.number,
829                start_view = %new_epoch.start_view,
830                validators = new_epoch.validator_set.validator_count(),
831                "epoch transition"
832            );
833            self.state.validator_set = new_epoch.validator_set.clone();
834            self.state.current_epoch = new_epoch;
835            // Notify network layer of the new validator set
836            self.network.on_epoch_change(&self.state.validator_set);
837            // Full clear: old votes/wishes are from the previous epoch's validator set
838            self.vote_collector = VoteCollector::new();
839            self.pacemaker = Pacemaker::with_config(self.pacemaker_config.clone());
840        }
841
842        view_protocol::enter_view(
843            &mut self.state,
844            new_view,
845            trigger,
846            self.network.as_ref(),
847            self.signer.as_ref(),
848        );
849
850        if is_progress {
851            self.pacemaker.reset_on_progress();
852        } else {
853            self.pacemaker.reset_timer();
854        }
855
856        self.persist_state();
857
858        // If we're the leader, propose immediately.
859        // Note: in a full implementation, the leader would collect StatusCerts
860        // before proposing (status_senders quorum gate). Currently the immediate
861        // propose path is required for liveness across epoch transitions where
862        // cross-epoch verification complexity can stall status collection.
863        if self.state.is_leader() && self.state.step == ViewStep::WaitingForStatus {
864            self.try_propose();
865        }
866    }
867}