Skip to main content

hotmint_consensus/
engine.rs

1use ruc::*;
2
3use std::collections::HashSet;
4use std::sync::{Arc, RwLock};
5
6use crate::application::Application;
7use crate::commit::try_commit;
8use crate::leader;
9use crate::network::NetworkSink;
10use crate::pacemaker::{Pacemaker, PacemakerConfig};
11use crate::state::{ConsensusState, ViewStep};
12use crate::store::BlockStore;
13use crate::view_protocol::{self, ViewEntryTrigger};
14use crate::vote_collector::VoteCollector;
15
16use hotmint_types::epoch::Epoch;
17use hotmint_types::vote::VoteType;
18use hotmint_types::*;
19use tokio::sync::mpsc;
20use tracing::{info, warn};
21
22/// Shared block store type used by the engine, RPC, and sync responder.
23pub type SharedBlockStore = Arc<RwLock<Box<dyn BlockStore>>>;
24
25/// Trait for persisting critical consensus state across restarts.
26pub trait StatePersistence: Send {
27    fn save_current_view(&mut self, view: ViewNumber);
28    fn save_locked_qc(&mut self, qc: &QuorumCertificate);
29    fn save_highest_qc(&mut self, qc: &QuorumCertificate);
30    fn save_last_committed_height(&mut self, height: Height);
31    fn save_current_epoch(&mut self, epoch: &Epoch);
32    fn flush(&self);
33}
34
35pub struct ConsensusEngine {
36    state: ConsensusState,
37    store: SharedBlockStore,
38    network: Box<dyn NetworkSink>,
39    app: Box<dyn Application>,
40    signer: Box<dyn Signer>,
41    verifier: Box<dyn Verifier>,
42    vote_collector: VoteCollector,
43    pacemaker: Pacemaker,
44    pacemaker_config: PacemakerConfig,
45    msg_rx: mpsc::Receiver<(ValidatorId, ConsensusMessage)>,
46    /// Collected unique status cert senders (for leader, per view)
47    status_senders: HashSet<ValidatorId>,
48    /// The QC formed in this view's first voting round (used to build DoubleCert)
49    current_view_qc: Option<QuorumCertificate>,
50    /// Pending epoch transition (set by try_commit, applied in advance_view_to)
51    pending_epoch: Option<Epoch>,
52    /// Optional state persistence (for crash recovery).
53    persistence: Option<Box<dyn StatePersistence>>,
54}
55
56/// Configuration for ConsensusEngine.
57pub struct EngineConfig {
58    pub verifier: Box<dyn Verifier>,
59    pub pacemaker: Option<PacemakerConfig>,
60    pub persistence: Option<Box<dyn StatePersistence>>,
61}
62
63impl EngineConfig {
64    /// Create an `EngineConfig` with the given verifier and defaults
65    /// (no custom pacemaker, no persistence).
66    pub fn new(verifier: Box<dyn Verifier>) -> Self {
67        Self {
68            verifier,
69            pacemaker: None,
70            persistence: None,
71        }
72    }
73
74    /// Set a custom pacemaker configuration.
75    pub fn with_pacemaker(mut self, pacemaker: PacemakerConfig) -> Self {
76        self.pacemaker = Some(pacemaker);
77        self
78    }
79
80    /// Set a state persistence backend.
81    pub fn with_persistence(mut self, persistence: Box<dyn StatePersistence>) -> Self {
82        self.persistence = Some(persistence);
83        self
84    }
85}
86
87/// Builder for constructing a `ConsensusEngine` with a fluent API.
88///
89/// # Example
90/// ```rust,ignore
91/// let engine = ConsensusEngineBuilder::new()
92///     .state(state)
93///     .store(store)
94///     .network(network)
95///     .app(app)
96///     .signer(signer)
97///     .messages(msg_rx)
98///     .verifier(verifier)
99///     .build()
100///     .expect("all required fields must be set");
101/// ```
102pub struct ConsensusEngineBuilder {
103    state: Option<ConsensusState>,
104    store: Option<SharedBlockStore>,
105    network: Option<Box<dyn NetworkSink>>,
106    app: Option<Box<dyn Application>>,
107    signer: Option<Box<dyn Signer>>,
108    msg_rx: Option<mpsc::Receiver<(ValidatorId, ConsensusMessage)>>,
109    verifier: Option<Box<dyn Verifier>>,
110    pacemaker: Option<PacemakerConfig>,
111    persistence: Option<Box<dyn StatePersistence>>,
112}
113
114impl ConsensusEngineBuilder {
115    pub fn new() -> Self {
116        Self {
117            state: None,
118            store: None,
119            network: None,
120            app: None,
121            signer: None,
122            msg_rx: None,
123            verifier: None,
124            pacemaker: None,
125            persistence: None,
126        }
127    }
128
129    pub fn state(mut self, state: ConsensusState) -> Self {
130        self.state = Some(state);
131        self
132    }
133
134    pub fn store(mut self, store: SharedBlockStore) -> Self {
135        self.store = Some(store);
136        self
137    }
138
139    pub fn network(mut self, network: Box<dyn NetworkSink>) -> Self {
140        self.network = Some(network);
141        self
142    }
143
144    pub fn app(mut self, app: Box<dyn Application>) -> Self {
145        self.app = Some(app);
146        self
147    }
148
149    pub fn signer(mut self, signer: Box<dyn Signer>) -> Self {
150        self.signer = Some(signer);
151        self
152    }
153
154    pub fn messages(mut self, msg_rx: mpsc::Receiver<(ValidatorId, ConsensusMessage)>) -> Self {
155        self.msg_rx = Some(msg_rx);
156        self
157    }
158
159    pub fn verifier(mut self, verifier: Box<dyn Verifier>) -> Self {
160        self.verifier = Some(verifier);
161        self
162    }
163
164    pub fn pacemaker(mut self, config: PacemakerConfig) -> Self {
165        self.pacemaker = Some(config);
166        self
167    }
168
169    pub fn persistence(mut self, persistence: Box<dyn StatePersistence>) -> Self {
170        self.persistence = Some(persistence);
171        self
172    }
173
174    pub fn build(self) -> ruc::Result<ConsensusEngine> {
175        let state = self.state.ok_or_else(|| ruc::eg!("state is required"))?;
176        let store = self.store.ok_or_else(|| ruc::eg!("store is required"))?;
177        let network = self
178            .network
179            .ok_or_else(|| ruc::eg!("network is required"))?;
180        let app = self.app.ok_or_else(|| ruc::eg!("app is required"))?;
181        let signer = self.signer.ok_or_else(|| ruc::eg!("signer is required"))?;
182        let msg_rx = self
183            .msg_rx
184            .ok_or_else(|| ruc::eg!("messages (msg_rx) is required"))?;
185        let verifier = self
186            .verifier
187            .ok_or_else(|| ruc::eg!("verifier is required"))?;
188
189        let config = EngineConfig {
190            verifier,
191            pacemaker: self.pacemaker,
192            persistence: self.persistence,
193        };
194
195        Ok(ConsensusEngine::new(
196            state, store, network, app, signer, msg_rx, config,
197        ))
198    }
199}
200
201impl Default for ConsensusEngineBuilder {
202    fn default() -> Self {
203        Self::new()
204    }
205}
206
207impl ConsensusEngine {
208    pub fn new(
209        state: ConsensusState,
210        store: SharedBlockStore,
211        network: Box<dyn NetworkSink>,
212        app: Box<dyn Application>,
213        signer: Box<dyn Signer>,
214        msg_rx: mpsc::Receiver<(ValidatorId, ConsensusMessage)>,
215        config: EngineConfig,
216    ) -> Self {
217        let pc = config.pacemaker.unwrap_or_default();
218        Self {
219            state,
220            store,
221            network,
222            app,
223            signer,
224            verifier: config.verifier,
225            vote_collector: VoteCollector::new(),
226            pacemaker: Pacemaker::with_config(pc.clone()),
227            pacemaker_config: pc,
228            msg_rx,
229            status_senders: HashSet::new(),
230            current_view_qc: None,
231            pending_epoch: None,
232            persistence: config.persistence,
233        }
234    }
235
236    /// Bootstrap and start the event loop.
237    /// If persisted state was restored (current_view > 1), skip genesis bootstrap.
238    pub async fn run(mut self) {
239        if self.state.current_view.as_u64() <= 1 {
240            self.enter_genesis_view();
241        } else {
242            info!(
243                validator = %self.state.validator_id,
244                view = %self.state.current_view,
245                height = %self.state.last_committed_height,
246                "resuming from persisted state"
247            );
248            self.pacemaker.reset_timer();
249        }
250
251        loop {
252            let deadline = self.pacemaker.sleep_until_deadline();
253            tokio::pin!(deadline);
254
255            tokio::select! {
256                Some((sender, msg)) = self.msg_rx.recv() => {
257                    if let Err(e) = self.handle_message(sender, msg) {
258                        warn!(validator = %self.state.validator_id, error = %e, "error handling message");
259                    }
260                }
261                _ = &mut deadline => {
262                    self.handle_timeout();
263                }
264            }
265        }
266    }
267
268    fn enter_genesis_view(&mut self) {
269        // Create a synthetic genesis QC so the first leader can propose
270        let genesis_qc = QuorumCertificate {
271            block_hash: BlockHash::GENESIS,
272            view: ViewNumber::GENESIS,
273            aggregate_signature: AggregateSignature::new(
274                self.state.validator_set.validator_count(),
275            ),
276        };
277        self.state.highest_qc = Some(genesis_qc);
278
279        let view = ViewNumber(1);
280        view_protocol::enter_view(
281            &mut self.state,
282            view,
283            ViewEntryTrigger::Genesis,
284            self.network.as_ref(),
285            self.signer.as_ref(),
286        );
287        self.pacemaker.reset_timer();
288
289        // If leader of genesis view, propose immediately
290        if self.state.is_leader() {
291            self.state.step = ViewStep::WaitingForStatus;
292            // In genesis, skip status wait — propose directly
293            self.try_propose();
294        }
295    }
296
297    fn try_propose(&mut self) {
298        let mut store = self.store.write().unwrap();
299        match view_protocol::propose(
300            &mut self.state,
301            store.as_mut(),
302            self.network.as_ref(),
303            self.app.as_ref(),
304            self.signer.as_ref(),
305        ) {
306            Ok(block) => {
307                drop(store);
308                // Leader votes for its own block
309                self.leader_self_vote(block.hash);
310            }
311            Err(e) => {
312                warn!(
313                    validator = %self.state.validator_id,
314                    error = %e,
315                    "failed to propose"
316                );
317            }
318        }
319    }
320
321    fn leader_self_vote(&mut self, block_hash: BlockHash) {
322        let vote_bytes = Vote::signing_bytes(self.state.current_view, &block_hash, VoteType::Vote);
323        let signature = self.signer.sign(&vote_bytes);
324        let vote = Vote {
325            block_hash,
326            view: self.state.current_view,
327            validator: self.state.validator_id,
328            signature,
329            vote_type: VoteType::Vote,
330        };
331        match self
332            .vote_collector
333            .add_vote(&self.state.validator_set, vote)
334        {
335            Ok(result) => {
336                self.handle_equivocation(&result);
337                if let Some(qc) = result.qc {
338                    self.on_qc_formed(qc);
339                }
340            }
341            Err(e) => warn!(error = %e, "failed to add self vote"),
342        }
343    }
344
345    /// Verify the cryptographic signature on an inbound consensus message.
346    /// Returns false (and logs a warning) if verification fails.
347    /// Messages from past views are skipped (they'll be dropped by handle_message anyway).
348    fn verify_message(&self, msg: &ConsensusMessage) -> bool {
349        // Skip verification for non-Propose past-view messages — these may have
350        // been signed by a previous epoch's validator set. They'll be dropped by
351        // view checks. Propose messages are always verified because they may still
352        // be stored (for chain continuity in fast-forward).
353        let msg_view = match msg {
354            ConsensusMessage::Propose { .. } => None, // always verify proposals
355            ConsensusMessage::VoteMsg(v) | ConsensusMessage::Vote2Msg(v) => Some(v.view),
356            ConsensusMessage::Prepare { certificate, .. } => Some(certificate.view),
357            ConsensusMessage::Wish { target_view, .. } => Some(*target_view),
358            ConsensusMessage::TimeoutCert(tc) => Some(ViewNumber(tc.view.as_u64() + 1)),
359            ConsensusMessage::StatusCert { .. } => None,
360        };
361        if let Some(v) = msg_view
362            && v < self.state.current_view
363        {
364            return true; // will be dropped by handler
365        }
366
367        let vs = &self.state.validator_set;
368        match msg {
369            ConsensusMessage::Propose {
370                block,
371                justify,
372                signature,
373                ..
374            } => {
375                let proposer = vs.get(block.proposer);
376                let Some(vi) = proposer else {
377                    warn!(proposer = %block.proposer, "propose from unknown validator");
378                    return false;
379                };
380                let bytes = view_protocol::proposal_signing_bytes(block, justify);
381                if !self.verifier.verify(&vi.public_key, &bytes, signature) {
382                    warn!(proposer = %block.proposer, "invalid proposal signature");
383                    return false;
384                }
385                // Verify justify QC aggregate signature (skip genesis QC which has no signers)
386                if justify.aggregate_signature.count() > 0 {
387                    let qc_bytes =
388                        Vote::signing_bytes(justify.view, &justify.block_hash, VoteType::Vote);
389                    if !self
390                        .verifier
391                        .verify_aggregate(vs, &qc_bytes, &justify.aggregate_signature)
392                    {
393                        warn!(proposer = %block.proposer, "invalid justify QC aggregate signature");
394                        return false;
395                    }
396                }
397                true
398            }
399            ConsensusMessage::VoteMsg(vote) | ConsensusMessage::Vote2Msg(vote) => {
400                let Some(vi) = vs.get(vote.validator) else {
401                    warn!(validator = %vote.validator, "vote from unknown validator");
402                    return false;
403                };
404                let bytes = Vote::signing_bytes(vote.view, &vote.block_hash, vote.vote_type);
405                if !self
406                    .verifier
407                    .verify(&vi.public_key, &bytes, &vote.signature)
408                {
409                    warn!(validator = %vote.validator, "invalid vote signature");
410                    return false;
411                }
412                true
413            }
414            ConsensusMessage::Prepare {
415                certificate,
416                signature,
417            } => {
418                // Verify the leader's signature on the prepare message
419                let leader = vs.leader_for_view(certificate.view);
420                let bytes = view_protocol::prepare_signing_bytes(certificate);
421                if !self.verifier.verify(&leader.public_key, &bytes, signature) {
422                    warn!(view = %certificate.view, "invalid prepare signature");
423                    return false;
424                }
425                // Also verify the QC's aggregate signature
426                let qc_bytes =
427                    Vote::signing_bytes(certificate.view, &certificate.block_hash, VoteType::Vote);
428                if !self
429                    .verifier
430                    .verify_aggregate(vs, &qc_bytes, &certificate.aggregate_signature)
431                {
432                    warn!(view = %certificate.view, "invalid QC aggregate signature");
433                    return false;
434                }
435                true
436            }
437            ConsensusMessage::Wish {
438                target_view,
439                validator,
440                signature,
441                ..
442            } => {
443                let Some(vi) = vs.get(*validator) else {
444                    warn!(validator = %validator, "wish from unknown validator");
445                    return false;
446                };
447                let bytes = crate::pacemaker::wish_signing_bytes(*target_view);
448                if !self.verifier.verify(&vi.public_key, &bytes, signature) {
449                    warn!(validator = %validator, "invalid wish signature");
450                    return false;
451                }
452                true
453            }
454            ConsensusMessage::TimeoutCert(tc) => {
455                // TC aggregate signature: individual wishes bind highest_qc,
456                // but aggregate verification can't split per-validator.
457                // Verify with None (base signing bytes) — individual wish
458                // verification at add_wish provides the full binding.
459                let bytes = crate::pacemaker::wish_signing_bytes(ViewNumber(tc.view.as_u64() + 1));
460                if !self
461                    .verifier
462                    .verify_aggregate(vs, &bytes, &tc.aggregate_signature)
463                {
464                    warn!(view = %tc.view, "invalid TC aggregate signature");
465                    return false;
466                }
467                true
468            }
469            ConsensusMessage::StatusCert {
470                locked_qc,
471                validator,
472                signature,
473            } => {
474                let Some(vi) = vs.get(*validator) else {
475                    warn!(validator = %validator, "status from unknown validator");
476                    return false;
477                };
478                let bytes = view_protocol::status_signing_bytes(self.state.current_view, locked_qc);
479                if !self.verifier.verify(&vi.public_key, &bytes, signature) {
480                    warn!(validator = %validator, "invalid status signature");
481                    return false;
482                }
483                true
484            }
485        }
486    }
487
488    fn handle_message(&mut self, _sender: ValidatorId, msg: ConsensusMessage) -> Result<()> {
489        if !self.verify_message(&msg) {
490            return Ok(());
491        }
492
493        match msg {
494            ConsensusMessage::Propose {
495                block,
496                justify,
497                double_cert,
498                signature: _,
499            } => {
500                let block = *block;
501                let justify = *justify;
502                let double_cert = double_cert.map(|dc| *dc);
503
504                // If proposal is from a future view, advance to it first
505                if block.view > self.state.current_view {
506                    if let Some(ref dc) = double_cert {
507                        // Validate DoubleCert comprehensively:
508                        // 1. Inner and outer QC must reference same block
509                        if dc.inner_qc.block_hash != dc.outer_qc.block_hash {
510                            warn!("double cert inner/outer block_hash mismatch");
511                            return Ok(());
512                        }
513                        // 2. Both QCs must have quorum-level signer count
514                        let quorum = self.state.validator_set.quorum_threshold() as usize;
515                        if dc.inner_qc.aggregate_signature.count() < quorum {
516                            warn!("double cert inner QC insufficient signers");
517                            return Ok(());
518                        }
519                        if dc.outer_qc.aggregate_signature.count() < quorum {
520                            warn!("double cert outer QC insufficient signers");
521                            return Ok(());
522                        }
523                        // 3. Verify inner QC aggregate signature (Vote1)
524                        let inner_bytes = Vote::signing_bytes(
525                            dc.inner_qc.view,
526                            &dc.inner_qc.block_hash,
527                            VoteType::Vote,
528                        );
529                        if !self.verifier.verify_aggregate(
530                            &self.state.validator_set,
531                            &inner_bytes,
532                            &dc.inner_qc.aggregate_signature,
533                        ) {
534                            warn!("double cert inner QC signature invalid");
535                            return Ok(());
536                        }
537                        // 4. Verify outer QC aggregate signature (Vote2)
538                        let outer_bytes = Vote::signing_bytes(
539                            dc.outer_qc.view,
540                            &dc.outer_qc.block_hash,
541                            VoteType::Vote2,
542                        );
543                        if !self.verifier.verify_aggregate(
544                            &self.state.validator_set,
545                            &outer_bytes,
546                            &dc.outer_qc.aggregate_signature,
547                        ) {
548                            warn!("double cert outer QC signature invalid");
549                            return Ok(());
550                        }
551
552                        // Fast-forward via double cert
553                        let store = self.store.read().unwrap();
554                        match try_commit(
555                            dc,
556                            store.as_ref(),
557                            self.app.as_ref(),
558                            &mut self.state.last_committed_height,
559                            &self.state.current_epoch,
560                        ) {
561                            Ok(result) => {
562                                if result.pending_epoch.is_some() {
563                                    self.pending_epoch = result.pending_epoch;
564                                }
565                                drop(store);
566                                // Store commit QC for sync protocol + flush
567                                {
568                                    let mut s = self.store.write().unwrap();
569                                    for block in &result.committed_blocks {
570                                        s.put_commit_qc(block.height, result.commit_qc.clone());
571                                    }
572                                    s.flush();
573                                }
574                            }
575                            Err(e) => {
576                                warn!(error = %e, "try_commit failed during fast-forward");
577                                drop(store);
578                            }
579                        }
580                        self.state.highest_double_cert = Some(dc.clone());
581                        self.advance_view_to(block.view, ViewEntryTrigger::DoubleCert(dc.clone()));
582                    } else {
583                        return Ok(());
584                    }
585                } else if block.view < self.state.current_view {
586                    // Still store blocks from past views if we haven't committed
587                    // that height yet. This handles the case where fast-forward
588                    // advanced our view but we missed storing the block from the
589                    // earlier proposal. Without this, chain commits that walk
590                    // the parent chain would fail with "block not found".
591                    if block.height > self.state.last_committed_height {
592                        // Verify block hash before storing past-view blocks
593                        let expected = hotmint_crypto::compute_block_hash(&block);
594                        if block.hash == expected {
595                            let mut store = self.store.write().unwrap();
596                            store.put_block(block);
597                        }
598                    }
599                    return Ok(());
600                }
601
602                let mut store = self.store.write().unwrap();
603                let maybe_pending = view_protocol::on_proposal(
604                    &mut self.state,
605                    view_protocol::ProposalData {
606                        block,
607                        justify,
608                        double_cert,
609                    },
610                    store.as_mut(),
611                    self.network.as_ref(),
612                    self.app.as_ref(),
613                    self.signer.as_ref(),
614                )
615                .c(d!())?;
616                drop(store);
617
618                if let Some(epoch) = maybe_pending {
619                    self.pending_epoch = Some(epoch);
620                }
621            }
622
623            ConsensusMessage::VoteMsg(vote) => {
624                if vote.view != self.state.current_view {
625                    return Ok(());
626                }
627                if !self.state.is_leader() {
628                    return Ok(());
629                }
630                if vote.vote_type != VoteType::Vote {
631                    return Ok(());
632                }
633
634                let result = self
635                    .vote_collector
636                    .add_vote(&self.state.validator_set, vote)
637                    .c(d!())?;
638                self.handle_equivocation(&result);
639                if let Some(qc) = result.qc {
640                    self.on_qc_formed(qc);
641                }
642            }
643
644            ConsensusMessage::Prepare {
645                certificate,
646                signature: _,
647            } => {
648                if certificate.view < self.state.current_view {
649                    return Ok(());
650                }
651                if certificate.view == self.state.current_view {
652                    view_protocol::on_prepare(
653                        &mut self.state,
654                        certificate,
655                        self.network.as_ref(),
656                        self.signer.as_ref(),
657                    );
658                }
659            }
660
661            ConsensusMessage::Vote2Msg(vote) => {
662                if vote.vote_type != VoteType::Vote2 {
663                    return Ok(());
664                }
665                if vote.view != self.state.current_view {
666                    return Ok(());
667                }
668                let result = self
669                    .vote_collector
670                    .add_vote(&self.state.validator_set, vote)
671                    .c(d!())?;
672                self.handle_equivocation(&result);
673                if let Some(outer_qc) = result.qc {
674                    self.on_double_cert_formed(outer_qc);
675                }
676            }
677
678            ConsensusMessage::Wish {
679                target_view,
680                validator,
681                highest_qc,
682                signature,
683            } => {
684                // Validate carried highest_qc (C4 mitigation)
685                if let Some(ref qc) = highest_qc
686                    && qc.aggregate_signature.count() > 0
687                {
688                    let qc_bytes = Vote::signing_bytes(qc.view, &qc.block_hash, VoteType::Vote);
689                    if !self.verifier.verify_aggregate(
690                        &self.state.validator_set,
691                        &qc_bytes,
692                        &qc.aggregate_signature,
693                    ) {
694                        warn!(validator = %validator, "wish carries invalid highest_qc");
695                        return Ok(());
696                    }
697                }
698
699                if let Some(tc) = self.pacemaker.add_wish(
700                    &self.state.validator_set,
701                    target_view,
702                    validator,
703                    highest_qc,
704                    signature,
705                ) {
706                    info!(
707                        validator = %self.state.validator_id,
708                        view = %tc.view,
709                        "TC formed, advancing view"
710                    );
711                    self.network
712                        .broadcast(ConsensusMessage::TimeoutCert(tc.clone()));
713                    self.advance_view(ViewEntryTrigger::TimeoutCert(tc));
714                }
715            }
716
717            ConsensusMessage::TimeoutCert(tc) => {
718                if self.pacemaker.should_relay_tc(&tc) {
719                    self.network
720                        .broadcast(ConsensusMessage::TimeoutCert(tc.clone()));
721                }
722                let new_view = ViewNumber(tc.view.as_u64() + 1);
723                if new_view > self.state.current_view {
724                    self.advance_view(ViewEntryTrigger::TimeoutCert(tc));
725                }
726            }
727
728            ConsensusMessage::StatusCert {
729                locked_qc,
730                validator,
731                signature: _,
732            } => {
733                if self.state.is_leader() && self.state.step == ViewStep::WaitingForStatus {
734                    if let Some(ref qc) = locked_qc {
735                        self.state.update_highest_qc(qc);
736                    }
737                    self.status_senders.insert(validator);
738                    let status_power: u64 = self
739                        .status_senders
740                        .iter()
741                        .map(|v| self.state.validator_set.power_of(*v))
742                        .sum();
743                    // Leader's own power counts toward quorum
744                    let total_power =
745                        status_power + self.state.validator_set.power_of(self.state.validator_id);
746                    if total_power >= self.state.validator_set.quorum_threshold() {
747                        self.try_propose();
748                    }
749                }
750            }
751        }
752        Ok(())
753    }
754
755    fn handle_equivocation(&self, result: &crate::vote_collector::VoteResult) {
756        if let Some(ref proof) = result.equivocation {
757            warn!(
758                validator = %proof.validator,
759                view = %proof.view,
760                "equivocation detected!"
761            );
762            if let Err(e) = self.app.on_evidence(proof) {
763                warn!(error = %e, "on_evidence callback failed");
764            }
765        }
766    }
767
768    fn on_qc_formed(&mut self, qc: QuorumCertificate) {
769        // Save the QC so we can reliably pair it when forming a DoubleCert
770        self.current_view_qc = Some(qc.clone());
771
772        view_protocol::on_votes_collected(
773            &mut self.state,
774            qc.clone(),
775            self.network.as_ref(),
776            self.signer.as_ref(),
777        );
778
779        // Leader also does vote2 for its own prepare (self-vote for step 5)
780        let vote_bytes =
781            Vote::signing_bytes(self.state.current_view, &qc.block_hash, VoteType::Vote2);
782        let signature = self.signer.sign(&vote_bytes);
783        let vote = Vote {
784            block_hash: qc.block_hash,
785            view: self.state.current_view,
786            validator: self.state.validator_id,
787            signature,
788            vote_type: VoteType::Vote2,
789        };
790
791        // Lock on this QC
792        self.state.update_locked_qc(&qc);
793
794        let next_leader_id =
795            leader::next_leader(&self.state.validator_set, self.state.current_view);
796        if next_leader_id == self.state.validator_id {
797            // We are the next leader, collect vote2 locally
798            match self
799                .vote_collector
800                .add_vote(&self.state.validator_set, vote)
801            {
802                Ok(result) => {
803                    self.handle_equivocation(&result);
804                    if let Some(outer_qc) = result.qc {
805                        self.on_double_cert_formed(outer_qc);
806                    }
807                }
808                Err(e) => warn!(error = %e, "failed to add self vote2"),
809            }
810        } else {
811            self.network
812                .send_to(next_leader_id, ConsensusMessage::Vote2Msg(vote));
813        }
814    }
815
816    fn on_double_cert_formed(&mut self, outer_qc: QuorumCertificate) {
817        // Use the QC we explicitly saved from this view's first voting round
818        let inner_qc = match self.current_view_qc.take() {
819            Some(qc) if qc.block_hash == outer_qc.block_hash => qc,
820            _ => {
821                // Fallback to locked_qc or highest_qc
822                match &self.state.locked_qc {
823                    Some(qc) if qc.block_hash == outer_qc.block_hash => qc.clone(),
824                    _ => match &self.state.highest_qc {
825                        Some(qc) if qc.block_hash == outer_qc.block_hash => qc.clone(),
826                        _ => {
827                            warn!(
828                                validator = %self.state.validator_id,
829                                "double cert formed but can't find matching inner QC"
830                            );
831                            return;
832                        }
833                    },
834                }
835            }
836        };
837
838        let dc = DoubleCertificate { inner_qc, outer_qc };
839
840        info!(
841            validator = %self.state.validator_id,
842            view = %self.state.current_view,
843            hash = %dc.inner_qc.block_hash,
844            "double certificate formed, committing"
845        );
846
847        // Commit
848        {
849            let store = self.store.read().unwrap();
850            match try_commit(
851                &dc,
852                store.as_ref(),
853                self.app.as_ref(),
854                &mut self.state.last_committed_height,
855                &self.state.current_epoch,
856            ) {
857                Ok(result) => {
858                    if result.pending_epoch.is_some() {
859                        self.pending_epoch = result.pending_epoch;
860                    }
861                    drop(store);
862                    {
863                        let mut s = self.store.write().unwrap();
864                        for block in &result.committed_blocks {
865                            s.put_commit_qc(block.height, result.commit_qc.clone());
866                        }
867                        s.flush();
868                    }
869                }
870                Err(e) => {
871                    warn!(error = %e, "try_commit failed in double cert handler");
872                    drop(store);
873                }
874            }
875        }
876
877        self.state.highest_double_cert = Some(dc.clone());
878
879        // Advance to next view — as new leader, include DC in proposal
880        self.advance_view(ViewEntryTrigger::DoubleCert(dc));
881    }
882
883    fn handle_timeout(&mut self) {
884        info!(
885            validator = %self.state.validator_id,
886            view = %self.state.current_view,
887            "view timeout, sending wish"
888        );
889
890        let wish = self.pacemaker.build_wish(
891            self.state.current_view,
892            self.state.validator_id,
893            self.state.highest_qc.clone(),
894            self.signer.as_ref(),
895        );
896
897        self.network.broadcast(wish.clone());
898
899        // Also process our own wish
900        if let ConsensusMessage::Wish {
901            target_view,
902            validator,
903            highest_qc,
904            signature,
905        } = wish
906            && let Some(tc) = self.pacemaker.add_wish(
907                &self.state.validator_set,
908                target_view,
909                validator,
910                highest_qc,
911                signature,
912            )
913        {
914            self.network
915                .broadcast(ConsensusMessage::TimeoutCert(tc.clone()));
916            self.advance_view(ViewEntryTrigger::TimeoutCert(tc));
917            return;
918        }
919
920        // Exponential backoff on repeated timeouts
921        self.pacemaker.on_timeout();
922    }
923
924    fn persist_state(&mut self) {
925        if let Some(p) = self.persistence.as_mut() {
926            p.save_current_view(self.state.current_view);
927            if let Some(ref qc) = self.state.locked_qc {
928                p.save_locked_qc(qc);
929            }
930            if let Some(ref qc) = self.state.highest_qc {
931                p.save_highest_qc(qc);
932            }
933            p.save_last_committed_height(self.state.last_committed_height);
934            p.save_current_epoch(&self.state.current_epoch);
935            p.flush();
936        }
937    }
938
939    fn advance_view(&mut self, trigger: ViewEntryTrigger) {
940        let new_view = match &trigger {
941            ViewEntryTrigger::DoubleCert(_) => self.state.current_view.next(),
942            ViewEntryTrigger::TimeoutCert(tc) => ViewNumber(tc.view.as_u64() + 1),
943            ViewEntryTrigger::Genesis => ViewNumber(1),
944        };
945        self.advance_view_to(new_view, trigger);
946    }
947
948    fn advance_view_to(&mut self, new_view: ViewNumber, trigger: ViewEntryTrigger) {
949        if new_view <= self.state.current_view {
950            return;
951        }
952
953        // Reset backoff on successful progress (DoubleCert path)
954        let is_progress = matches!(&trigger, ViewEntryTrigger::DoubleCert(_));
955
956        self.vote_collector.clear_view(self.state.current_view);
957        self.vote_collector.prune_before(self.state.current_view);
958        self.pacemaker.clear_view(self.state.current_view);
959        self.status_senders.clear();
960        self.current_view_qc = None;
961
962        // Epoch transition: apply pending validator set change when we reach the
963        // epoch's start_view. The start_view is set deterministically (commit_view + 2)
964        // so all honest nodes apply the transition at the same view.
965        if let Some(ref epoch) = self.pending_epoch
966            && new_view >= epoch.start_view
967        {
968            let new_epoch = self.pending_epoch.take().unwrap();
969            info!(
970                validator = %self.state.validator_id,
971                old_epoch = %self.state.current_epoch.number,
972                new_epoch = %new_epoch.number,
973                start_view = %new_epoch.start_view,
974                validators = new_epoch.validator_set.validator_count(),
975                "epoch transition"
976            );
977            self.state.validator_set = new_epoch.validator_set.clone();
978            self.state.current_epoch = new_epoch;
979            // Notify network layer of the new validator set
980            self.network.on_epoch_change(&self.state.validator_set);
981            // Full clear: old votes/wishes are from the previous epoch's validator set
982            self.vote_collector = VoteCollector::new();
983            self.pacemaker = Pacemaker::with_config(self.pacemaker_config.clone());
984        }
985
986        view_protocol::enter_view(
987            &mut self.state,
988            new_view,
989            trigger,
990            self.network.as_ref(),
991            self.signer.as_ref(),
992        );
993
994        if is_progress {
995            self.pacemaker.reset_on_progress();
996        } else {
997            self.pacemaker.reset_timer();
998        }
999
1000        self.persist_state();
1001
1002        // If we're the leader, propose immediately.
1003        // Note: in a full implementation, the leader would collect StatusCerts
1004        // before proposing (status_senders quorum gate). Currently the immediate
1005        // propose path is required for liveness across epoch transitions where
1006        // cross-epoch verification complexity can stall status collection.
1007        if self.state.is_leader() && self.state.step == ViewStep::WaitingForStatus {
1008            self.try_propose();
1009        }
1010    }
1011}