Skip to main content

hotmint_consensus/
engine.rs

1use ruc::*;
2
3use std::sync::{Arc, RwLock};
4
5use crate::application::Application;
6use crate::commit::try_commit;
7use crate::leader;
8use crate::network::NetworkSink;
9use crate::pacemaker::Pacemaker;
10use crate::state::{ConsensusState, ViewStep};
11use crate::store::BlockStore;
12use crate::view_protocol::{self, ViewEntryTrigger};
13use crate::vote_collector::VoteCollector;
14
15use hotmint_types::epoch::Epoch;
16use hotmint_types::vote::VoteType;
17use hotmint_types::*;
18use tokio::sync::mpsc;
19use tracing::{info, warn};
20
21/// Shared block store type used by the engine, RPC, and sync responder.
22pub type SharedBlockStore = Arc<RwLock<Box<dyn BlockStore>>>;
23
24pub struct ConsensusEngine {
25    state: ConsensusState,
26    store: SharedBlockStore,
27    network: Box<dyn NetworkSink>,
28    app: Box<dyn Application>,
29    signer: Box<dyn Signer>,
30    vote_collector: VoteCollector,
31    pacemaker: Pacemaker,
32    msg_rx: mpsc::UnboundedReceiver<(ValidatorId, ConsensusMessage)>,
33    /// Collected status certs from replicas (for leader)
34    status_count: usize,
35    /// The QC formed in this view's first voting round (used to build DoubleCert)
36    current_view_qc: Option<QuorumCertificate>,
37    /// Pending epoch transition (set by try_commit, applied in advance_view_to)
38    pending_epoch: Option<Epoch>,
39}
40
41impl ConsensusEngine {
42    pub fn new(
43        state: ConsensusState,
44        store: SharedBlockStore,
45        network: Box<dyn NetworkSink>,
46        app: Box<dyn Application>,
47        signer: Box<dyn Signer>,
48        msg_rx: mpsc::UnboundedReceiver<(ValidatorId, ConsensusMessage)>,
49    ) -> Self {
50        Self {
51            state,
52            store,
53            network,
54            app,
55            signer,
56            vote_collector: VoteCollector::new(),
57            pacemaker: Pacemaker::new(),
58            msg_rx,
59            status_count: 0,
60            current_view_qc: None,
61            pending_epoch: None,
62        }
63    }
64
65    /// Bootstrap: enter genesis view and start the event loop
66    pub async fn run(mut self) {
67        self.enter_genesis_view();
68
69        loop {
70            let deadline = self.pacemaker.sleep_until_deadline();
71            tokio::pin!(deadline);
72
73            tokio::select! {
74                Some((sender, msg)) = self.msg_rx.recv() => {
75                    if let Err(e) = self.handle_message(sender, msg) {
76                        warn!(validator = %self.state.validator_id, error = %e, "error handling message");
77                    }
78                }
79                _ = &mut deadline => {
80                    self.handle_timeout();
81                }
82            }
83        }
84    }
85
86    fn enter_genesis_view(&mut self) {
87        // Create a synthetic genesis QC so the first leader can propose
88        let genesis_qc = QuorumCertificate {
89            block_hash: BlockHash::GENESIS,
90            view: ViewNumber::GENESIS,
91            aggregate_signature: AggregateSignature::new(
92                self.state.validator_set.validator_count(),
93            ),
94        };
95        self.state.highest_qc = Some(genesis_qc);
96
97        let view = ViewNumber(1);
98        view_protocol::enter_view(
99            &mut self.state,
100            view,
101            ViewEntryTrigger::Genesis,
102            self.network.as_ref(),
103            self.signer.as_ref(),
104        );
105        self.pacemaker.reset_timer();
106
107        // If leader of genesis view, propose immediately
108        if self.state.is_leader() {
109            self.state.step = ViewStep::WaitingForStatus;
110            // In genesis, skip status wait — propose directly
111            self.try_propose();
112        }
113    }
114
115    fn try_propose(&mut self) {
116        let mut store = self.store.write().unwrap();
117        match view_protocol::propose(
118            &mut self.state,
119            store.as_mut(),
120            self.network.as_ref(),
121            self.app.as_ref(),
122            self.signer.as_ref(),
123        ) {
124            Ok(block) => {
125                drop(store);
126                // Leader votes for its own block
127                self.leader_self_vote(block.hash);
128            }
129            Err(e) => {
130                warn!(
131                    validator = %self.state.validator_id,
132                    error = %e,
133                    "failed to propose"
134                );
135            }
136        }
137    }
138
139    fn leader_self_vote(&mut self, block_hash: BlockHash) {
140        let vote_bytes = Vote::signing_bytes(self.state.current_view, &block_hash, VoteType::Vote);
141        let signature = self.signer.sign(&vote_bytes);
142        let vote = Vote {
143            block_hash,
144            view: self.state.current_view,
145            validator: self.state.validator_id,
146            signature,
147            vote_type: VoteType::Vote,
148        };
149        match self
150            .vote_collector
151            .add_vote(&self.state.validator_set, vote)
152        {
153            Ok(result) => {
154                self.handle_equivocation(&result);
155                if let Some(qc) = result.qc {
156                    self.on_qc_formed(qc);
157                }
158            }
159            Err(e) => warn!(error = %e, "failed to add self vote"),
160        }
161    }
162
163    fn handle_message(&mut self, _sender: ValidatorId, msg: ConsensusMessage) -> Result<()> {
164        match msg {
165            ConsensusMessage::Propose {
166                block,
167                justify,
168                double_cert,
169                signature: _,
170            } => {
171                let block = *block;
172                let justify = *justify;
173                let double_cert = double_cert.map(|dc| *dc);
174
175                // If proposal is from a future view, advance to it first
176                if block.view > self.state.current_view {
177                    if let Some(ref dc) = double_cert {
178                        // Fast-forward via double cert
179                        let store = self.store.read().unwrap();
180                        match try_commit(
181                            dc,
182                            store.as_ref(),
183                            self.app.as_ref(),
184                            &mut self.state.last_committed_height,
185                            &self.state.current_epoch,
186                        ) {
187                            Ok(result) => {
188                                if result.pending_epoch.is_some() {
189                                    self.pending_epoch = result.pending_epoch;
190                                }
191                            }
192                            Err(e) => {
193                                warn!(error = %e, "try_commit failed during fast-forward");
194                            }
195                        }
196                        drop(store);
197                        self.state.highest_double_cert = Some(dc.clone());
198                        self.advance_view_to(block.view, ViewEntryTrigger::DoubleCert(dc.clone()));
199                    } else {
200                        return Ok(());
201                    }
202                } else if block.view < self.state.current_view {
203                    // Still store blocks from past views if we haven't committed
204                    // that height yet. This handles the case where fast-forward
205                    // advanced our view but we missed storing the block from the
206                    // earlier proposal. Without this, chain commits that walk
207                    // the parent chain would fail with "block not found".
208                    if block.height > self.state.last_committed_height {
209                        let mut store = self.store.write().unwrap();
210                        store.put_block(block);
211                    }
212                    return Ok(());
213                }
214
215                let mut store = self.store.write().unwrap();
216                let maybe_pending = view_protocol::on_proposal(
217                    &mut self.state,
218                    block,
219                    justify,
220                    double_cert,
221                    store.as_mut(),
222                    self.network.as_ref(),
223                    self.app.as_ref(),
224                    self.signer.as_ref(),
225                )
226                .c(d!())?;
227                drop(store);
228
229                if let Some(epoch) = maybe_pending {
230                    self.pending_epoch = Some(epoch);
231                }
232            }
233
234            ConsensusMessage::VoteMsg(vote) => {
235                if vote.view != self.state.current_view {
236                    return Ok(());
237                }
238                if !self.state.is_leader() {
239                    return Ok(());
240                }
241                if vote.vote_type != VoteType::Vote {
242                    return Ok(());
243                }
244
245                let result = self
246                    .vote_collector
247                    .add_vote(&self.state.validator_set, vote)
248                    .c(d!())?;
249                self.handle_equivocation(&result);
250                if let Some(qc) = result.qc {
251                    self.on_qc_formed(qc);
252                }
253            }
254
255            ConsensusMessage::Prepare {
256                certificate,
257                signature: _,
258            } => {
259                if certificate.view < self.state.current_view {
260                    return Ok(());
261                }
262                if certificate.view == self.state.current_view {
263                    view_protocol::on_prepare(
264                        &mut self.state,
265                        certificate,
266                        self.network.as_ref(),
267                        self.signer.as_ref(),
268                    );
269                }
270            }
271
272            ConsensusMessage::Vote2Msg(vote) => {
273                if vote.vote_type != VoteType::Vote2 {
274                    return Ok(());
275                }
276                let result = self
277                    .vote_collector
278                    .add_vote(&self.state.validator_set, vote)
279                    .c(d!())?;
280                self.handle_equivocation(&result);
281                if let Some(outer_qc) = result.qc {
282                    self.on_double_cert_formed(outer_qc);
283                }
284            }
285
286            ConsensusMessage::Wish {
287                target_view,
288                validator,
289                highest_qc,
290                signature,
291            } => {
292                if let Some(tc) = self.pacemaker.add_wish(
293                    &self.state.validator_set,
294                    target_view,
295                    validator,
296                    highest_qc,
297                    signature,
298                ) {
299                    info!(
300                        validator = %self.state.validator_id,
301                        view = %tc.view,
302                        "TC formed, advancing view"
303                    );
304                    self.network
305                        .broadcast(ConsensusMessage::TimeoutCert(tc.clone()));
306                    self.advance_view(ViewEntryTrigger::TimeoutCert(tc));
307                }
308            }
309
310            ConsensusMessage::TimeoutCert(tc) => {
311                if self.pacemaker.should_relay_tc(&tc) {
312                    self.network
313                        .broadcast(ConsensusMessage::TimeoutCert(tc.clone()));
314                }
315                let new_view = ViewNumber(tc.view.as_u64() + 1);
316                if new_view > self.state.current_view {
317                    self.advance_view(ViewEntryTrigger::TimeoutCert(tc));
318                }
319            }
320
321            ConsensusMessage::StatusCert {
322                locked_qc,
323                validator: _,
324                signature: _,
325            } => {
326                if self.state.is_leader() && self.state.step == ViewStep::WaitingForStatus {
327                    if let Some(ref qc) = locked_qc {
328                        self.state.update_highest_qc(qc);
329                    }
330                    self.status_count += 1;
331                    let needed = self.state.validator_set.quorum_threshold() as usize - 1;
332                    if self.status_count >= needed {
333                        self.try_propose();
334                    }
335                }
336            }
337        }
338        Ok(())
339    }
340
341    fn handle_equivocation(&self, result: &crate::vote_collector::VoteResult) {
342        if let Some(ref proof) = result.equivocation {
343            warn!(
344                validator = %proof.validator,
345                view = %proof.view,
346                "equivocation detected!"
347            );
348            if let Err(e) = self.app.on_evidence(proof) {
349                warn!(error = %e, "on_evidence callback failed");
350            }
351        }
352    }
353
354    fn on_qc_formed(&mut self, qc: QuorumCertificate) {
355        // Save the QC so we can reliably pair it when forming a DoubleCert
356        self.current_view_qc = Some(qc.clone());
357
358        view_protocol::on_votes_collected(
359            &mut self.state,
360            qc.clone(),
361            self.network.as_ref(),
362            self.signer.as_ref(),
363        );
364
365        // Leader also does vote2 for its own prepare (self-vote for step 5)
366        let vote_bytes =
367            Vote::signing_bytes(self.state.current_view, &qc.block_hash, VoteType::Vote2);
368        let signature = self.signer.sign(&vote_bytes);
369        let vote = Vote {
370            block_hash: qc.block_hash,
371            view: self.state.current_view,
372            validator: self.state.validator_id,
373            signature,
374            vote_type: VoteType::Vote2,
375        };
376
377        // Lock on this QC
378        self.state.update_locked_qc(&qc);
379
380        let next_leader_id =
381            leader::next_leader(&self.state.validator_set, self.state.current_view);
382        if next_leader_id == self.state.validator_id {
383            // We are the next leader, collect vote2 locally
384            match self
385                .vote_collector
386                .add_vote(&self.state.validator_set, vote)
387            {
388                Ok(result) => {
389                    self.handle_equivocation(&result);
390                    if let Some(outer_qc) = result.qc {
391                        self.on_double_cert_formed(outer_qc);
392                    }
393                }
394                Err(e) => warn!(error = %e, "failed to add self vote2"),
395            }
396        } else {
397            self.network
398                .send_to(next_leader_id, ConsensusMessage::Vote2Msg(vote));
399        }
400    }
401
402    fn on_double_cert_formed(&mut self, outer_qc: QuorumCertificate) {
403        // Use the QC we explicitly saved from this view's first voting round
404        let inner_qc = match self.current_view_qc.take() {
405            Some(qc) if qc.block_hash == outer_qc.block_hash => qc,
406            _ => {
407                // Fallback to locked_qc or highest_qc
408                match &self.state.locked_qc {
409                    Some(qc) if qc.block_hash == outer_qc.block_hash => qc.clone(),
410                    _ => match &self.state.highest_qc {
411                        Some(qc) if qc.block_hash == outer_qc.block_hash => qc.clone(),
412                        _ => {
413                            warn!(
414                                validator = %self.state.validator_id,
415                                "double cert formed but can't find matching inner QC"
416                            );
417                            return;
418                        }
419                    },
420                }
421            }
422        };
423
424        let dc = DoubleCertificate { inner_qc, outer_qc };
425
426        info!(
427            validator = %self.state.validator_id,
428            view = %self.state.current_view,
429            hash = %dc.inner_qc.block_hash,
430            "double certificate formed, committing"
431        );
432
433        // Commit
434        {
435            let store = self.store.read().unwrap();
436            match try_commit(
437                &dc,
438                store.as_ref(),
439                self.app.as_ref(),
440                &mut self.state.last_committed_height,
441                &self.state.current_epoch,
442            ) {
443                Ok(result) => {
444                    if result.pending_epoch.is_some() {
445                        self.pending_epoch = result.pending_epoch;
446                    }
447                }
448                Err(e) => {
449                    warn!(error = %e, "try_commit failed in double cert handler");
450                }
451            }
452        }
453
454        self.state.highest_double_cert = Some(dc.clone());
455
456        // Advance to next view — as new leader, include DC in proposal
457        self.advance_view(ViewEntryTrigger::DoubleCert(dc));
458    }
459
460    fn handle_timeout(&mut self) {
461        info!(
462            validator = %self.state.validator_id,
463            view = %self.state.current_view,
464            "view timeout, sending wish"
465        );
466
467        let wish = self.pacemaker.build_wish(
468            self.state.current_view,
469            self.state.validator_id,
470            self.state.highest_qc.clone(),
471            self.signer.as_ref(),
472        );
473
474        self.network.broadcast(wish.clone());
475
476        // Also process our own wish
477        if let ConsensusMessage::Wish {
478            target_view,
479            validator,
480            highest_qc,
481            signature,
482        } = wish
483            && let Some(tc) = self.pacemaker.add_wish(
484                &self.state.validator_set,
485                target_view,
486                validator,
487                highest_qc,
488                signature,
489            )
490        {
491            self.network
492                .broadcast(ConsensusMessage::TimeoutCert(tc.clone()));
493            self.advance_view(ViewEntryTrigger::TimeoutCert(tc));
494            return;
495        }
496
497        // Exponential backoff on repeated timeouts
498        self.pacemaker.on_timeout();
499    }
500
501    fn advance_view(&mut self, trigger: ViewEntryTrigger) {
502        let new_view = match &trigger {
503            ViewEntryTrigger::DoubleCert(_) => self.state.current_view.next(),
504            ViewEntryTrigger::TimeoutCert(tc) => ViewNumber(tc.view.as_u64() + 1),
505            ViewEntryTrigger::Genesis => ViewNumber(1),
506        };
507        self.advance_view_to(new_view, trigger);
508    }
509
510    fn advance_view_to(&mut self, new_view: ViewNumber, trigger: ViewEntryTrigger) {
511        if new_view <= self.state.current_view {
512            return;
513        }
514
515        // Reset backoff on successful progress (DoubleCert path)
516        let is_progress = matches!(&trigger, ViewEntryTrigger::DoubleCert(_));
517
518        self.vote_collector.clear_view(self.state.current_view);
519        self.pacemaker.clear_view(self.state.current_view);
520        self.status_count = 0;
521        self.current_view_qc = None;
522
523        // Epoch transition: apply pending validator set change when we reach the
524        // epoch's start_view. The start_view is set deterministically (commit_view + 2)
525        // so all honest nodes apply the transition at the same view.
526        if let Some(ref epoch) = self.pending_epoch
527            && new_view >= epoch.start_view
528        {
529            let new_epoch = self.pending_epoch.take().unwrap();
530            info!(
531                validator = %self.state.validator_id,
532                old_epoch = %self.state.current_epoch.number,
533                new_epoch = %new_epoch.number,
534                start_view = %new_epoch.start_view,
535                validators = new_epoch.validator_set.validator_count(),
536                "epoch transition"
537            );
538            self.state.validator_set = new_epoch.validator_set.clone();
539            self.state.current_epoch = new_epoch;
540            // Full clear: old votes/wishes are from the previous epoch's validator set
541            self.vote_collector = VoteCollector::new();
542            self.pacemaker = Pacemaker::new();
543        }
544
545        view_protocol::enter_view(
546            &mut self.state,
547            new_view,
548            trigger,
549            self.network.as_ref(),
550            self.signer.as_ref(),
551        );
552
553        if is_progress {
554            self.pacemaker.reset_on_progress();
555        } else {
556            self.pacemaker.reset_timer();
557        }
558
559        // If we're the leader, we may need to propose
560        if self.state.is_leader() && self.state.step == ViewStep::WaitingForStatus {
561            // In simplified version, leader proposes immediately
562            self.try_propose();
563        }
564    }
565}