Skip to main content

hotmint_consensus/
engine.rs

1use ruc::*;
2
3use std::sync::{Arc, RwLock};
4
5use crate::application::Application;
6use crate::commit::try_commit;
7use crate::leader;
8use crate::network::NetworkSink;
9use crate::pacemaker::Pacemaker;
10use crate::state::{ConsensusState, ViewStep};
11use crate::store::BlockStore;
12use crate::view_protocol::{self, ViewEntryTrigger};
13use crate::vote_collector::VoteCollector;
14
15use hotmint_types::epoch::Epoch;
16use hotmint_types::vote::VoteType;
17use hotmint_types::*;
18use tokio::sync::mpsc;
19use tracing::{info, warn};
20
21/// Shared block store type used by the engine, RPC, and sync responder.
22pub type SharedBlockStore = Arc<RwLock<Box<dyn BlockStore>>>;
23
24pub struct ConsensusEngine {
25    state: ConsensusState,
26    store: SharedBlockStore,
27    network: Box<dyn NetworkSink>,
28    app: Box<dyn Application>,
29    signer: Box<dyn Signer>,
30    vote_collector: VoteCollector,
31    pacemaker: Pacemaker,
32    msg_rx: mpsc::UnboundedReceiver<(ValidatorId, ConsensusMessage)>,
33    /// Collected status certs from replicas (for leader)
34    status_count: usize,
35    /// The QC formed in this view's first voting round (used to build DoubleCert)
36    current_view_qc: Option<QuorumCertificate>,
37    /// Pending epoch transition (set by try_commit, applied in advance_view_to)
38    pending_epoch: Option<Epoch>,
39}
40
41impl ConsensusEngine {
42    pub fn new(
43        state: ConsensusState,
44        store: SharedBlockStore,
45        network: Box<dyn NetworkSink>,
46        app: Box<dyn Application>,
47        signer: Box<dyn Signer>,
48        msg_rx: mpsc::UnboundedReceiver<(ValidatorId, ConsensusMessage)>,
49    ) -> Self {
50        Self {
51            state,
52            store,
53            network,
54            app,
55            signer,
56            vote_collector: VoteCollector::new(),
57            pacemaker: Pacemaker::new(),
58            msg_rx,
59            status_count: 0,
60            current_view_qc: None,
61            pending_epoch: None,
62        }
63    }
64
65    /// Bootstrap: enter genesis view and start the event loop
66    pub async fn run(mut self) {
67        self.enter_genesis_view();
68
69        loop {
70            let deadline = self.pacemaker.sleep_until_deadline();
71            tokio::pin!(deadline);
72
73            tokio::select! {
74                Some((sender, msg)) = self.msg_rx.recv() => {
75                    if let Err(e) = self.handle_message(sender, msg) {
76                        warn!(validator = %self.state.validator_id, error = %e, "error handling message");
77                    }
78                }
79                _ = &mut deadline => {
80                    self.handle_timeout();
81                }
82            }
83        }
84    }
85
86    fn enter_genesis_view(&mut self) {
87        // Create a synthetic genesis QC so the first leader can propose
88        let genesis_qc = QuorumCertificate {
89            block_hash: BlockHash::GENESIS,
90            view: ViewNumber::GENESIS,
91            aggregate_signature: AggregateSignature::new(
92                self.state.validator_set.validator_count(),
93            ),
94        };
95        self.state.highest_qc = Some(genesis_qc);
96
97        let view = ViewNumber(1);
98        view_protocol::enter_view(
99            &mut self.state,
100            view,
101            ViewEntryTrigger::Genesis,
102            self.network.as_ref(),
103            self.signer.as_ref(),
104        );
105        self.pacemaker.reset_timer();
106
107        // If leader of genesis view, propose immediately
108        if self.state.is_leader() {
109            self.state.step = ViewStep::WaitingForStatus;
110            // In genesis, skip status wait — propose directly
111            self.try_propose();
112        }
113    }
114
115    fn try_propose(&mut self) {
116        let mut store = self.store.write().unwrap();
117        match view_protocol::propose(
118            &mut self.state,
119            store.as_mut(),
120            self.network.as_ref(),
121            self.app.as_ref(),
122            self.signer.as_ref(),
123        ) {
124            Ok(block) => {
125                drop(store);
126                // Leader votes for its own block
127                self.leader_self_vote(block.hash);
128            }
129            Err(e) => {
130                warn!(
131                    validator = %self.state.validator_id,
132                    error = %e,
133                    "failed to propose"
134                );
135            }
136        }
137    }
138
139    fn leader_self_vote(&mut self, block_hash: BlockHash) {
140        let vote_bytes = Vote::signing_bytes(self.state.current_view, &block_hash, VoteType::Vote);
141        let signature = self.signer.sign(&vote_bytes);
142        let vote = Vote {
143            block_hash,
144            view: self.state.current_view,
145            validator: self.state.validator_id,
146            signature,
147            vote_type: VoteType::Vote,
148        };
149        match self
150            .vote_collector
151            .add_vote(&self.state.validator_set, vote)
152        {
153            Ok(result) => {
154                self.handle_equivocation(&result);
155                if let Some(qc) = result.qc {
156                    self.on_qc_formed(qc);
157                }
158            }
159            Err(e) => warn!(error = %e, "failed to add self vote"),
160        }
161    }
162
163    fn handle_message(&mut self, _sender: ValidatorId, msg: ConsensusMessage) -> Result<()> {
164        match msg {
165            ConsensusMessage::Propose {
166                block,
167                justify,
168                double_cert,
169                signature: _,
170            } => {
171                let block = *block;
172                let justify = *justify;
173                let double_cert = double_cert.map(|dc| *dc);
174
175                // If proposal is from a future view, advance to it first
176                if block.view > self.state.current_view {
177                    if let Some(ref dc) = double_cert {
178                        // Fast-forward via double cert
179                        let store = self.store.read().unwrap();
180                        match try_commit(
181                            dc,
182                            store.as_ref(),
183                            self.app.as_ref(),
184                            &mut self.state.last_committed_height,
185                            &self.state.current_epoch,
186                        ) {
187                            Ok(result) => {
188                                if result.pending_epoch.is_some() {
189                                    self.pending_epoch = result.pending_epoch;
190                                }
191                            }
192                            Err(e) => {
193                                warn!(error = %e, "try_commit failed during fast-forward");
194                            }
195                        }
196                        drop(store);
197                        self.state.highest_double_cert = Some(dc.clone());
198                        self.advance_view_to(block.view, ViewEntryTrigger::DoubleCert(dc.clone()));
199                    } else {
200                        return Ok(());
201                    }
202                } else if block.view < self.state.current_view {
203                    return Ok(());
204                }
205
206                let mut store = self.store.write().unwrap();
207                let maybe_pending = view_protocol::on_proposal(
208                    &mut self.state,
209                    block,
210                    justify,
211                    double_cert,
212                    store.as_mut(),
213                    self.network.as_ref(),
214                    self.app.as_ref(),
215                    self.signer.as_ref(),
216                )
217                .c(d!())?;
218                drop(store);
219
220                if let Some(epoch) = maybe_pending {
221                    self.pending_epoch = Some(epoch);
222                }
223            }
224
225            ConsensusMessage::VoteMsg(vote) => {
226                if vote.view != self.state.current_view {
227                    return Ok(());
228                }
229                if !self.state.is_leader() {
230                    return Ok(());
231                }
232                if vote.vote_type != VoteType::Vote {
233                    return Ok(());
234                }
235
236                let result = self
237                    .vote_collector
238                    .add_vote(&self.state.validator_set, vote)
239                    .c(d!())?;
240                self.handle_equivocation(&result);
241                if let Some(qc) = result.qc {
242                    self.on_qc_formed(qc);
243                }
244            }
245
246            ConsensusMessage::Prepare {
247                certificate,
248                signature: _,
249            } => {
250                if certificate.view < self.state.current_view {
251                    return Ok(());
252                }
253                if certificate.view == self.state.current_view {
254                    view_protocol::on_prepare(
255                        &mut self.state,
256                        certificate,
257                        self.network.as_ref(),
258                        self.signer.as_ref(),
259                    );
260                }
261            }
262
263            ConsensusMessage::Vote2Msg(vote) => {
264                if vote.vote_type != VoteType::Vote2 {
265                    return Ok(());
266                }
267                let result = self
268                    .vote_collector
269                    .add_vote(&self.state.validator_set, vote)
270                    .c(d!())?;
271                self.handle_equivocation(&result);
272                if let Some(outer_qc) = result.qc {
273                    self.on_double_cert_formed(outer_qc);
274                }
275            }
276
277            ConsensusMessage::Wish {
278                target_view,
279                validator,
280                highest_qc,
281                signature,
282            } => {
283                if let Some(tc) = self.pacemaker.add_wish(
284                    &self.state.validator_set,
285                    target_view,
286                    validator,
287                    highest_qc,
288                    signature,
289                ) {
290                    info!(
291                        validator = %self.state.validator_id,
292                        view = %tc.view,
293                        "TC formed, advancing view"
294                    );
295                    self.network
296                        .broadcast(ConsensusMessage::TimeoutCert(tc.clone()));
297                    self.advance_view(ViewEntryTrigger::TimeoutCert(tc));
298                }
299            }
300
301            ConsensusMessage::TimeoutCert(tc) => {
302                if self.pacemaker.should_relay_tc(&tc) {
303                    self.network
304                        .broadcast(ConsensusMessage::TimeoutCert(tc.clone()));
305                }
306                let new_view = ViewNumber(tc.view.as_u64() + 1);
307                if new_view > self.state.current_view {
308                    self.advance_view(ViewEntryTrigger::TimeoutCert(tc));
309                }
310            }
311
312            ConsensusMessage::StatusCert {
313                locked_qc,
314                validator: _,
315                signature: _,
316            } => {
317                if self.state.is_leader() && self.state.step == ViewStep::WaitingForStatus {
318                    if let Some(ref qc) = locked_qc {
319                        self.state.update_highest_qc(qc);
320                    }
321                    self.status_count += 1;
322                    let needed = self.state.validator_set.quorum_threshold() as usize - 1;
323                    if self.status_count >= needed {
324                        self.try_propose();
325                    }
326                }
327            }
328        }
329        Ok(())
330    }
331
332    fn handle_equivocation(&self, result: &crate::vote_collector::VoteResult) {
333        if let Some(ref proof) = result.equivocation {
334            warn!(
335                validator = %proof.validator,
336                view = %proof.view,
337                "equivocation detected!"
338            );
339            if let Err(e) = self.app.on_evidence(proof) {
340                warn!(error = %e, "on_evidence callback failed");
341            }
342        }
343    }
344
345    fn on_qc_formed(&mut self, qc: QuorumCertificate) {
346        // Save the QC so we can reliably pair it when forming a DoubleCert
347        self.current_view_qc = Some(qc.clone());
348
349        view_protocol::on_votes_collected(
350            &mut self.state,
351            qc.clone(),
352            self.network.as_ref(),
353            self.signer.as_ref(),
354        );
355
356        // Leader also does vote2 for its own prepare (self-vote for step 5)
357        let vote_bytes =
358            Vote::signing_bytes(self.state.current_view, &qc.block_hash, VoteType::Vote2);
359        let signature = self.signer.sign(&vote_bytes);
360        let vote = Vote {
361            block_hash: qc.block_hash,
362            view: self.state.current_view,
363            validator: self.state.validator_id,
364            signature,
365            vote_type: VoteType::Vote2,
366        };
367
368        // Lock on this QC
369        self.state.update_locked_qc(&qc);
370
371        let next_leader_id =
372            leader::next_leader(&self.state.validator_set, self.state.current_view);
373        if next_leader_id == self.state.validator_id {
374            // We are the next leader, collect vote2 locally
375            match self
376                .vote_collector
377                .add_vote(&self.state.validator_set, vote)
378            {
379                Ok(result) => {
380                    self.handle_equivocation(&result);
381                    if let Some(outer_qc) = result.qc {
382                        self.on_double_cert_formed(outer_qc);
383                    }
384                }
385                Err(e) => warn!(error = %e, "failed to add self vote2"),
386            }
387        } else {
388            self.network
389                .send_to(next_leader_id, ConsensusMessage::Vote2Msg(vote));
390        }
391    }
392
393    fn on_double_cert_formed(&mut self, outer_qc: QuorumCertificate) {
394        // Use the QC we explicitly saved from this view's first voting round
395        let inner_qc = match self.current_view_qc.take() {
396            Some(qc) if qc.block_hash == outer_qc.block_hash => qc,
397            _ => {
398                // Fallback to locked_qc or highest_qc
399                match &self.state.locked_qc {
400                    Some(qc) if qc.block_hash == outer_qc.block_hash => qc.clone(),
401                    _ => match &self.state.highest_qc {
402                        Some(qc) if qc.block_hash == outer_qc.block_hash => qc.clone(),
403                        _ => {
404                            warn!(
405                                validator = %self.state.validator_id,
406                                "double cert formed but can't find matching inner QC"
407                            );
408                            return;
409                        }
410                    },
411                }
412            }
413        };
414
415        let dc = DoubleCertificate { inner_qc, outer_qc };
416
417        info!(
418            validator = %self.state.validator_id,
419            view = %self.state.current_view,
420            hash = %dc.inner_qc.block_hash,
421            "double certificate formed, committing"
422        );
423
424        // Commit
425        {
426            let store = self.store.read().unwrap();
427            match try_commit(
428                &dc,
429                store.as_ref(),
430                self.app.as_ref(),
431                &mut self.state.last_committed_height,
432                &self.state.current_epoch,
433            ) {
434                Ok(result) => {
435                    if result.pending_epoch.is_some() {
436                        self.pending_epoch = result.pending_epoch;
437                    }
438                }
439                Err(e) => {
440                    warn!(error = %e, "try_commit failed in double cert handler");
441                }
442            }
443        }
444
445        self.state.highest_double_cert = Some(dc.clone());
446
447        // Advance to next view — as new leader, include DC in proposal
448        self.advance_view(ViewEntryTrigger::DoubleCert(dc));
449    }
450
451    fn handle_timeout(&mut self) {
452        info!(
453            validator = %self.state.validator_id,
454            view = %self.state.current_view,
455            "view timeout, sending wish"
456        );
457
458        let wish = self.pacemaker.build_wish(
459            self.state.current_view,
460            self.state.validator_id,
461            self.state.highest_qc.clone(),
462            self.signer.as_ref(),
463        );
464
465        self.network.broadcast(wish.clone());
466
467        // Also process our own wish
468        if let ConsensusMessage::Wish {
469            target_view,
470            validator,
471            highest_qc,
472            signature,
473        } = wish
474            && let Some(tc) = self.pacemaker.add_wish(
475                &self.state.validator_set,
476                target_view,
477                validator,
478                highest_qc,
479                signature,
480            )
481        {
482            self.network
483                .broadcast(ConsensusMessage::TimeoutCert(tc.clone()));
484            self.advance_view(ViewEntryTrigger::TimeoutCert(tc));
485            return;
486        }
487
488        // Exponential backoff on repeated timeouts
489        self.pacemaker.on_timeout();
490    }
491
492    fn advance_view(&mut self, trigger: ViewEntryTrigger) {
493        let new_view = match &trigger {
494            ViewEntryTrigger::DoubleCert(_) => self.state.current_view.next(),
495            ViewEntryTrigger::TimeoutCert(tc) => ViewNumber(tc.view.as_u64() + 1),
496            ViewEntryTrigger::Genesis => ViewNumber(1),
497        };
498        self.advance_view_to(new_view, trigger);
499    }
500
501    fn advance_view_to(&mut self, new_view: ViewNumber, trigger: ViewEntryTrigger) {
502        if new_view <= self.state.current_view {
503            return;
504        }
505
506        // Reset backoff on successful progress (DoubleCert path)
507        let is_progress = matches!(&trigger, ViewEntryTrigger::DoubleCert(_));
508
509        self.vote_collector.clear_view(self.state.current_view);
510        self.pacemaker.clear_view(self.state.current_view);
511        self.status_count = 0;
512        self.current_view_qc = None;
513
514        // Epoch transition: apply pending validator set change before entering new view
515        if let Some(mut new_epoch) = self.pending_epoch.take() {
516            new_epoch.start_view = new_view;
517            info!(
518                validator = %self.state.validator_id,
519                old_epoch = %self.state.current_epoch.number,
520                new_epoch = %new_epoch.number,
521                start_view = %new_view,
522                validators = new_epoch.validator_set.validator_count(),
523                "epoch transition"
524            );
525            self.state.validator_set = new_epoch.validator_set.clone();
526            self.state.current_epoch = new_epoch;
527            // Full clear: old votes/wishes are from the previous epoch's validator set
528            self.vote_collector = VoteCollector::new();
529            self.pacemaker = Pacemaker::new();
530        }
531
532        view_protocol::enter_view(
533            &mut self.state,
534            new_view,
535            trigger,
536            self.network.as_ref(),
537            self.signer.as_ref(),
538        );
539
540        if is_progress {
541            self.pacemaker.reset_on_progress();
542        } else {
543            self.pacemaker.reset_timer();
544        }
545
546        // If we're the leader, we may need to propose
547        if self.state.is_leader() && self.state.step == ViewStep::WaitingForStatus {
548            // In simplified version, leader proposes immediately
549            self.try_propose();
550        }
551    }
552}