Skip to main content

hotmint_consensus/
engine.rs

1use ruc::*;
2
3use crate::application::Application;
4use crate::commit::try_commit;
5use crate::leader;
6use crate::network::NetworkSink;
7use crate::pacemaker::Pacemaker;
8use crate::state::{ConsensusState, ViewStep};
9use crate::store::BlockStore;
10use crate::view_protocol::{self, ViewEntryTrigger};
11use crate::vote_collector::VoteCollector;
12
13use hotmint_types::vote::VoteType;
14use hotmint_types::*;
15use tokio::sync::mpsc;
16use tracing::{debug, info, warn};
17
18pub struct ConsensusEngine {
19    state: ConsensusState,
20    store: Box<dyn BlockStore>,
21    network: Box<dyn NetworkSink>,
22    app: Box<dyn Application>,
23    signer: Box<dyn Signer>,
24    vote_collector: VoteCollector,
25    pacemaker: Pacemaker,
26    msg_rx: mpsc::UnboundedReceiver<(ValidatorId, ConsensusMessage)>,
27    /// Collected status certs from replicas (for leader)
28    status_count: usize,
29    /// The QC formed in this view's first voting round (used to build DoubleCert)
30    current_view_qc: Option<QuorumCertificate>,
31}
32
33impl ConsensusEngine {
34    pub fn new(
35        state: ConsensusState,
36        store: Box<dyn BlockStore>,
37        network: Box<dyn NetworkSink>,
38        app: Box<dyn Application>,
39        signer: Box<dyn Signer>,
40        msg_rx: mpsc::UnboundedReceiver<(ValidatorId, ConsensusMessage)>,
41    ) -> Self {
42        Self {
43            state,
44            store,
45            network,
46            app,
47            signer,
48            vote_collector: VoteCollector::new(),
49            pacemaker: Pacemaker::new(),
50            msg_rx,
51            status_count: 0,
52            current_view_qc: None,
53        }
54    }
55
56    /// Bootstrap: enter genesis view and start the event loop
57    pub async fn run(mut self) {
58        self.enter_genesis_view();
59
60        loop {
61            let deadline = self.pacemaker.sleep_until_deadline();
62            tokio::pin!(deadline);
63
64            tokio::select! {
65                Some((sender, msg)) = self.msg_rx.recv() => {
66                    if let Err(e) = self.handle_message(sender, msg) {
67                        warn!(validator = %self.state.validator_id, error = %e, "error handling message");
68                    }
69                }
70                _ = &mut deadline => {
71                    self.handle_timeout();
72                }
73            }
74        }
75    }
76
77    fn enter_genesis_view(&mut self) {
78        // Create a synthetic genesis QC so the first leader can propose
79        let genesis_qc = QuorumCertificate {
80            block_hash: BlockHash::GENESIS,
81            view: ViewNumber::GENESIS,
82            aggregate_signature: AggregateSignature::new(
83                self.state.validator_set.validator_count(),
84            ),
85        };
86        self.state.highest_qc = Some(genesis_qc);
87
88        let view = ViewNumber(1);
89        view_protocol::enter_view(
90            &mut self.state,
91            view,
92            ViewEntryTrigger::Genesis,
93            self.network.as_ref(),
94            self.signer.as_ref(),
95        );
96        self.pacemaker.reset_timer();
97
98        // If leader of genesis view, propose immediately
99        if self.state.is_leader() {
100            self.state.step = ViewStep::WaitingForStatus;
101            // In genesis, skip status wait — propose directly
102            self.try_propose();
103        }
104    }
105
106    fn try_propose(&mut self) {
107        match view_protocol::propose(
108            &mut self.state,
109            self.store.as_mut(),
110            self.network.as_ref(),
111            self.app.as_ref(),
112            self.signer.as_ref(),
113        ) {
114            Ok(block) => {
115                // Leader votes for its own block
116                self.leader_self_vote(block.hash);
117            }
118            Err(e) => {
119                warn!(
120                    validator = %self.state.validator_id,
121                    error = %e,
122                    "failed to propose"
123                );
124            }
125        }
126    }
127
128    fn leader_self_vote(&mut self, block_hash: BlockHash) {
129        let vote_bytes = Vote::signing_bytes(self.state.current_view, &block_hash, VoteType::Vote);
130        let signature = self.signer.sign(&vote_bytes);
131        let vote = Vote {
132            block_hash,
133            view: self.state.current_view,
134            validator: self.state.validator_id,
135            signature,
136            vote_type: VoteType::Vote,
137        };
138        if let Ok(Some(formed_qc)) = self
139            .vote_collector
140            .add_vote(&self.state.validator_set, vote)
141        {
142            self.on_qc_formed(formed_qc);
143        }
144    }
145
146    fn handle_message(&mut self, _sender: ValidatorId, msg: ConsensusMessage) -> Result<()> {
147        match msg {
148            ConsensusMessage::Propose {
149                block,
150                justify,
151                double_cert,
152                signature: _,
153            } => {
154                let block = *block;
155                let justify = *justify;
156                let double_cert = double_cert.map(|dc| *dc);
157
158                // If proposal is from a future view, advance to it first
159                if block.view > self.state.current_view {
160                    if let Some(ref dc) = double_cert {
161                        // Fast-forward via double cert
162                        if let Err(e) = try_commit(
163                            dc,
164                            self.store.as_ref(),
165                            self.app.as_ref(),
166                            &mut self.state.last_committed_height,
167                        ) {
168                            warn!(error = %e, "try_commit failed during fast-forward");
169                        }
170                        self.state.highest_double_cert = Some(dc.clone());
171                        self.advance_view_to(block.view, ViewEntryTrigger::DoubleCert(dc.clone()));
172                    } else {
173                        debug!(
174                            validator = %self.state.validator_id,
175                            block_view = %block.view,
176                            current_view = %self.state.current_view,
177                            "ignoring proposal from future view without double cert"
178                        );
179                        return Ok(());
180                    }
181                } else if block.view < self.state.current_view {
182                    return Ok(());
183                }
184
185                view_protocol::on_proposal(
186                    &mut self.state,
187                    block,
188                    justify,
189                    double_cert,
190                    self.store.as_mut(),
191                    self.network.as_ref(),
192                    self.app.as_ref(),
193                    self.signer.as_ref(),
194                )
195                .c(d!())?;
196            }
197
198            ConsensusMessage::VoteMsg(vote) => {
199                if vote.view != self.state.current_view {
200                    return Ok(());
201                }
202                if !self.state.is_leader() {
203                    return Ok(());
204                }
205                if vote.vote_type != VoteType::Vote {
206                    return Ok(());
207                }
208
209                if let Some(qc) = self
210                    .vote_collector
211                    .add_vote(&self.state.validator_set, vote)
212                    .c(d!())?
213                {
214                    self.on_qc_formed(qc);
215                }
216            }
217
218            ConsensusMessage::Prepare {
219                certificate,
220                signature: _,
221            } => {
222                if certificate.view < self.state.current_view {
223                    return Ok(());
224                }
225                // Accept prepare from current view
226                if certificate.view == self.state.current_view {
227                    view_protocol::on_prepare(
228                        &mut self.state,
229                        certificate,
230                        self.network.as_ref(),
231                        self.signer.as_ref(),
232                    );
233                }
234                // For future-view prepares, we just update our highest QC
235                // (we'll catch up via TC or next proposal)
236            }
237
238            ConsensusMessage::Vote2Msg(vote) => {
239                if vote.vote_type != VoteType::Vote2 {
240                    return Ok(());
241                }
242                // Vote2 is sent to next leader for view+1
243                // Collect it to form double cert
244                if let Some(outer_qc) = self
245                    .vote_collector
246                    .add_vote(&self.state.validator_set, vote)
247                    .c(d!())?
248                {
249                    self.on_double_cert_formed(outer_qc);
250                }
251            }
252
253            ConsensusMessage::Wish {
254                target_view,
255                validator,
256                highest_qc,
257                signature,
258            } => {
259                if let Some(tc) = self.pacemaker.add_wish(
260                    &self.state.validator_set,
261                    target_view,
262                    validator,
263                    highest_qc,
264                    signature,
265                ) {
266                    info!(
267                        validator = %self.state.validator_id,
268                        view = %tc.view,
269                        "TC formed, advancing view"
270                    );
271                    self.network
272                        .broadcast(ConsensusMessage::TimeoutCert(tc.clone()));
273                    self.advance_view(ViewEntryTrigger::TimeoutCert(tc));
274                }
275            }
276
277            ConsensusMessage::TimeoutCert(tc) => {
278                // TC relay: rebroadcast if not yet relayed
279                if self.pacemaker.should_relay_tc(&tc) {
280                    self.network
281                        .broadcast(ConsensusMessage::TimeoutCert(tc.clone()));
282                }
283                let new_view = ViewNumber(tc.view.as_u64() + 1);
284                if new_view > self.state.current_view {
285                    self.advance_view(ViewEntryTrigger::TimeoutCert(tc));
286                }
287            }
288
289            ConsensusMessage::StatusCert {
290                locked_qc,
291                validator: _,
292                signature: _,
293            } => {
294                if self.state.is_leader() && self.state.step == ViewStep::WaitingForStatus {
295                    if let Some(ref qc) = locked_qc {
296                        self.state.update_highest_qc(qc);
297                    }
298                    self.status_count += 1;
299                    let needed = self.state.validator_set.quorum_threshold() as usize - 1;
300                    if self.status_count >= needed {
301                        self.try_propose();
302                    }
303                }
304            }
305        }
306        Ok(())
307    }
308
309    fn on_qc_formed(&mut self, qc: QuorumCertificate) {
310        // Save the QC so we can reliably pair it when forming a DoubleCert
311        self.current_view_qc = Some(qc.clone());
312
313        view_protocol::on_votes_collected(
314            &mut self.state,
315            qc.clone(),
316            self.network.as_ref(),
317            self.signer.as_ref(),
318        );
319
320        // Leader also does vote2 for its own prepare (self-vote for step 5)
321        let vote_bytes =
322            Vote::signing_bytes(self.state.current_view, &qc.block_hash, VoteType::Vote2);
323        let signature = self.signer.sign(&vote_bytes);
324        let vote = Vote {
325            block_hash: qc.block_hash,
326            view: self.state.current_view,
327            validator: self.state.validator_id,
328            signature,
329            vote_type: VoteType::Vote2,
330        };
331
332        // Lock on this QC
333        self.state.update_locked_qc(&qc);
334
335        let next_leader_id =
336            leader::next_leader(&self.state.validator_set, self.state.current_view);
337        if next_leader_id == self.state.validator_id {
338            // We are the next leader, collect vote2 locally
339            if let Ok(Some(outer_qc)) = self
340                .vote_collector
341                .add_vote(&self.state.validator_set, vote)
342            {
343                self.on_double_cert_formed(outer_qc);
344            }
345        } else {
346            self.network
347                .send_to(next_leader_id, ConsensusMessage::Vote2Msg(vote));
348        }
349    }
350
351    fn on_double_cert_formed(&mut self, outer_qc: QuorumCertificate) {
352        // Use the QC we explicitly saved from this view's first voting round
353        let inner_qc = match self.current_view_qc.take() {
354            Some(qc) if qc.block_hash == outer_qc.block_hash => qc,
355            _ => {
356                // Fallback to locked_qc or highest_qc
357                match &self.state.locked_qc {
358                    Some(qc) if qc.block_hash == outer_qc.block_hash => qc.clone(),
359                    _ => match &self.state.highest_qc {
360                        Some(qc) if qc.block_hash == outer_qc.block_hash => qc.clone(),
361                        _ => {
362                            warn!(
363                                validator = %self.state.validator_id,
364                                "double cert formed but can't find matching inner QC"
365                            );
366                            return;
367                        }
368                    },
369                }
370            }
371        };
372
373        let dc = DoubleCertificate { inner_qc, outer_qc };
374
375        info!(
376            validator = %self.state.validator_id,
377            view = %self.state.current_view,
378            hash = %dc.inner_qc.block_hash,
379            "double certificate formed, committing"
380        );
381
382        // Commit
383        if let Err(e) = try_commit(
384            &dc,
385            self.store.as_ref(),
386            self.app.as_ref(),
387            &mut self.state.last_committed_height,
388        ) {
389            warn!(error = %e, "try_commit failed in double cert handler");
390        }
391
392        self.state.highest_double_cert = Some(dc.clone());
393
394        // Advance to next view — as new leader, include DC in proposal
395        // so other validators can catch up
396        self.advance_view(ViewEntryTrigger::DoubleCert(dc));
397    }
398
399    fn handle_timeout(&mut self) {
400        info!(
401            validator = %self.state.validator_id,
402            view = %self.state.current_view,
403            "view timeout, sending wish"
404        );
405
406        let wish = self.pacemaker.build_wish(
407            self.state.current_view,
408            self.state.validator_id,
409            self.state.highest_qc.clone(),
410            self.signer.as_ref(),
411        );
412
413        self.network.broadcast(wish.clone());
414
415        // Also process our own wish
416        if let ConsensusMessage::Wish {
417            target_view,
418            validator,
419            highest_qc,
420            signature,
421        } = wish
422            && let Some(tc) = self.pacemaker.add_wish(
423                &self.state.validator_set,
424                target_view,
425                validator,
426                highest_qc,
427                signature,
428            )
429        {
430            self.network
431                .broadcast(ConsensusMessage::TimeoutCert(tc.clone()));
432            self.advance_view(ViewEntryTrigger::TimeoutCert(tc));
433            return;
434        }
435
436        // Exponential backoff on repeated timeouts
437        self.pacemaker.on_timeout();
438    }
439
440    fn advance_view(&mut self, trigger: ViewEntryTrigger) {
441        let new_view = match &trigger {
442            ViewEntryTrigger::DoubleCert(_) => self.state.current_view.next(),
443            ViewEntryTrigger::TimeoutCert(tc) => ViewNumber(tc.view.as_u64() + 1),
444            ViewEntryTrigger::Genesis => ViewNumber(1),
445        };
446        self.advance_view_to(new_view, trigger);
447    }
448
449    fn advance_view_to(&mut self, new_view: ViewNumber, trigger: ViewEntryTrigger) {
450        if new_view <= self.state.current_view {
451            return;
452        }
453
454        // Reset backoff on successful progress (DoubleCert path)
455        let is_progress = matches!(&trigger, ViewEntryTrigger::DoubleCert(_));
456
457        self.vote_collector.clear_view(self.state.current_view);
458        self.pacemaker.clear_view(self.state.current_view);
459        self.status_count = 0;
460        self.current_view_qc = None;
461
462        view_protocol::enter_view(
463            &mut self.state,
464            new_view,
465            trigger,
466            self.network.as_ref(),
467            self.signer.as_ref(),
468        );
469
470        if is_progress {
471            self.pacemaker.reset_on_progress();
472        } else {
473            self.pacemaker.reset_timer();
474        }
475
476        // If we're the leader, we may need to propose
477        if self.state.is_leader() && self.state.step == ViewStep::WaitingForStatus {
478            // In simplified version, leader proposes immediately
479            self.try_propose();
480        }
481    }
482}