Skip to main content

hotmint_consensus/
view_protocol.rs

1use ruc::*;
2
3use crate::application::Application;
4use crate::commit::{CommitResult, try_commit};
5use crate::leader;
6use crate::network::NetworkSink;
7use crate::state::{ConsensusState, ViewRole, ViewStep};
8use crate::store::BlockStore;
9use hotmint_crypto::hash::compute_block_hash;
10use hotmint_types::context::BlockContext;
11use hotmint_types::epoch::{Epoch, EpochNumber};
12use hotmint_types::evidence::EquivocationProof;
13use hotmint_types::vote::VoteType;
14use hotmint_types::*;
15use tracing::{debug, info, warn};
16
17/// Trigger that causes us to enter a new view
18pub enum ViewEntryTrigger {
19    DoubleCert(DoubleCertificate),
20    TimeoutCert(TimeoutCertificate),
21    Genesis,
22}
23
24/// Execute step (1): Enter view
25pub fn enter_view(
26    state: &mut ConsensusState,
27    view: ViewNumber,
28    trigger: ViewEntryTrigger,
29    network: &dyn NetworkSink,
30    signer: &dyn Signer,
31) {
32    state.current_view = view;
33    state.step = ViewStep::Entered;
34
35    let am_leader = leader::is_leader(&state.validator_set, view, state.validator_id);
36    state.role = if am_leader {
37        ViewRole::Leader
38    } else {
39        ViewRole::Replica
40    };
41
42    info!(
43        validator = %state.validator_id,
44        view = %view,
45        role = ?state.role,
46        epoch = %state.current_epoch.number,
47        "entering view"
48    );
49
50    match trigger {
51        ViewEntryTrigger::Genesis => {
52            if am_leader {
53                // Genesis leader enters WaitingForStatus; engine calls try_propose() directly
54                state.step = ViewStep::WaitingForStatus;
55            } else {
56                state.step = ViewStep::WaitingForProposal;
57            }
58        }
59        ViewEntryTrigger::DoubleCert(dc) => {
60            state.update_highest_qc(&dc.outer_qc);
61            state.highest_double_cert = Some(dc);
62            if am_leader {
63                state.step = ViewStep::WaitingForStatus;
64            } else {
65                // Send status to new leader
66                let leader_id = state
67                    .validator_set
68                    .leader_for_view(view)
69                    .expect("empty validator set")
70                    .id;
71                let msg_bytes = status_signing_bytes(
72                    &state.chain_id_hash,
73                    state.current_epoch.number,
74                    view,
75                    &state.locked_qc,
76                );
77                let sig = signer.sign(&msg_bytes);
78                network.send_to(
79                    leader_id,
80                    ConsensusMessage::StatusCert {
81                        locked_qc: state.locked_qc.clone(),
82                        validator: state.validator_id,
83                        signature: sig,
84                    },
85                );
86                state.step = ViewStep::WaitingForProposal;
87            }
88        }
89        ViewEntryTrigger::TimeoutCert(tc) => {
90            if let Some(hqc) = tc.highest_qc() {
91                state.update_highest_qc(hqc);
92            }
93            if am_leader {
94                state.step = ViewStep::WaitingForStatus;
95            } else {
96                let leader_id = state
97                    .validator_set
98                    .leader_for_view(view)
99                    .expect("empty validator set")
100                    .id;
101                let msg_bytes = status_signing_bytes(
102                    &state.chain_id_hash,
103                    state.current_epoch.number,
104                    view,
105                    &state.locked_qc,
106                );
107                let sig = signer.sign(&msg_bytes);
108                network.send_to(
109                    leader_id,
110                    ConsensusMessage::StatusCert {
111                        locked_qc: state.locked_qc.clone(),
112                        validator: state.validator_id,
113                        signature: sig,
114                    },
115                );
116                state.step = ViewStep::WaitingForProposal;
117            }
118        }
119    }
120}
121
122/// Execute step (2): Leader proposes
123pub fn propose(
124    state: &mut ConsensusState,
125    store: &mut dyn BlockStore,
126    network: &dyn NetworkSink,
127    app: &dyn Application,
128    signer: &dyn Signer,
129    evidence: Vec<EquivocationProof>,
130) -> Result<Block> {
131    let justify = state
132        .highest_qc
133        .clone()
134        .c(d!("no QC to justify proposal"))?;
135
136    let parent_hash = justify.block_hash;
137    let parent = store
138        .get_block(&parent_hash)
139        .c(d!("parent block not found"))?;
140    let height = parent.height.next();
141
142    // BFT Time: proposer sets timestamp = max(now, parent.timestamp + 1).
143    let now_ms = std::time::SystemTime::now()
144        .duration_since(std::time::UNIX_EPOCH)
145        .unwrap_or_default()
146        .as_millis() as u64;
147    let timestamp = std::cmp::max(now_ms, parent.timestamp.saturating_add(1));
148
149    let ctx = BlockContext {
150        height,
151        view: state.current_view,
152        proposer: state.validator_id,
153        epoch: state.current_epoch.number,
154        epoch_start_view: state.current_epoch.start_view,
155        validator_set: &state.validator_set,
156        timestamp,
157        vote_extensions: std::mem::take(&mut state.pending_vote_extensions),
158    };
159
160    let payload = app.create_payload(&ctx);
161
162    if !evidence.is_empty() {
163        info!(
164            validator = %state.validator_id,
165            count = evidence.len(),
166            "embedding equivocation evidence in block"
167        );
168    }
169
170    let mut block = Block {
171        height,
172        parent_hash,
173        view: state.current_view,
174        proposer: state.validator_id,
175        timestamp,
176        payload,
177        app_hash: state.last_app_hash,
178        evidence,
179        hash: BlockHash::GENESIS, // placeholder
180    };
181    block.hash = compute_block_hash(&block);
182
183    store.put_block(block.clone());
184
185    let msg_bytes = proposal_signing_bytes(
186        &state.chain_id_hash,
187        state.current_epoch.number,
188        &block,
189        &justify,
190    );
191    let signature = signer.sign(&msg_bytes);
192
193    info!(
194        validator = %state.validator_id,
195        view = %state.current_view,
196        height = height.as_u64(),
197        hash = %block.hash,
198        "proposing block"
199    );
200
201    network.broadcast(ConsensusMessage::Propose {
202        block: Box::new(block.clone()),
203        justify: Box::new(justify),
204        double_cert: state.highest_double_cert.clone().map(Box::new),
205        signature,
206    });
207
208    state.step = ViewStep::CollectingVotes;
209    Ok(block)
210}
211
212/// Incoming proposal data from the leader.
213pub struct ProposalData {
214    pub block: Block,
215    pub justify: QuorumCertificate,
216    pub double_cert: Option<DoubleCertificate>,
217}
218
219/// Execute step (3): Replica receives proposal, validates, votes.
220/// Returns `Option<Epoch>` if fast-forward commit triggered an epoch change.
221/// Result of on_proposal that may include a fast-forward commit.
222pub struct ProposalResult {
223    /// Pending epoch from the fast-forward commit (if any).
224    pub pending_epoch: Option<Epoch>,
225    /// CommitResult from the fast-forward DC commit (if any).
226    /// The engine must process this for WAL, tx indexing, evidence marking, etc.
227    pub commit_result: Option<CommitResult>,
228}
229
230pub fn on_proposal(
231    state: &mut ConsensusState,
232    proposal: ProposalData,
233    store: &mut dyn BlockStore,
234    network: &dyn NetworkSink,
235    app: &dyn Application,
236    signer: &dyn Signer,
237) -> Result<ProposalResult> {
238    let ProposalData {
239        block,
240        justify,
241        double_cert,
242    } = proposal;
243    if state.step != ViewStep::WaitingForProposal {
244        debug!(
245            validator = %state.validator_id,
246            step = ?state.step,
247            "ignoring proposal, not waiting"
248        );
249        return Ok(ProposalResult {
250            pending_epoch: None,
251            commit_result: None,
252        });
253    }
254
255    // Safety check: justify.rank() >= locked_qc.rank()
256    if let Some(ref locked) = state.locked_qc
257        && justify.rank() < locked.rank()
258    {
259        warn!(
260            validator = %state.validator_id,
261            justify_view = %justify.view,
262            locked_view = %locked.view,
263            "rejecting proposal: justify rank < locked rank"
264        );
265        return Err(eg!("proposal justify rank below locked QC rank"));
266    }
267
268    // Verify proposer is the rightful leader for this view
269    let expected_leader = state
270        .validator_set
271        .leader_for_view(block.view)
272        .ok_or_else(|| eg!("empty validator set"))?
273        .id;
274    if block.proposer != expected_leader {
275        return Err(eg!(
276            "block proposer {} is not leader {} for view {}",
277            block.proposer,
278            expected_leader,
279            block.view
280        ));
281    }
282
283    // Verify block hash integrity
284    let expected_hash = hotmint_crypto::compute_block_hash(&block);
285    if block.hash != expected_hash {
286        return Err(eg!(
287            "block hash mismatch: declared {} != computed {}",
288            block.hash,
289            expected_hash
290        ));
291    }
292
293    // C-4: Verify block extends the chain certified by the justify QC.
294    // The genesis QC certifies BlockHash::GENESIS, and the first block's
295    // parent is also GENESIS, so this check is correct for height 1 as well.
296    if block.parent_hash != justify.block_hash {
297        return Err(eg!(
298            "block parent_hash {} does not match justify block_hash {}",
299            block.parent_hash,
300            justify.block_hash
301        ));
302    }
303
304    // BFT Time: verify the block timestamp is monotonically non-decreasing
305    // and within a reasonable drift window of the local clock.
306    // Skip only for genesis (height 0); height 1+ MUST have valid timestamps.
307    if block.height.as_u64() > 0 {
308        if block.timestamp == 0 {
309            return Err(eg!("non-genesis block has timestamp 0"));
310        }
311        // Check monotonicity against parent block (if available).
312        if let Some(parent) = store.get_block(&block.parent_hash)
313            && block.timestamp < parent.timestamp
314        {
315            return Err(eg!(
316                "block timestamp {} < parent timestamp {}",
317                block.timestamp,
318                parent.timestamp
319            ));
320        }
321        // Allow up to 15 seconds of clock drift into the future.
322        let now_ms = std::time::SystemTime::now()
323            .duration_since(std::time::UNIX_EPOCH)
324            .unwrap_or_default()
325            .as_millis() as u64;
326        const MAX_FUTURE_DRIFT_MS: u64 = 15_000;
327        if block.timestamp > now_ms + MAX_FUTURE_DRIFT_MS {
328            return Err(eg!(
329                "block timestamp {} is too far in the future (local: {})",
330                block.timestamp,
331                now_ms
332            ));
333        }
334    }
335
336    let ctx = BlockContext {
337        height: block.height,
338        view: block.view,
339        proposer: block.proposer,
340        epoch: state.current_epoch.number,
341        epoch_start_view: state.current_epoch.start_view,
342        validator_set: &state.validator_set,
343        timestamp: block.timestamp,
344        vote_extensions: vec![],
345    };
346
347    if !app.validate_block(&block, &ctx) {
348        return Err(eg!("application rejected block"));
349    }
350
351    // Store the block
352    store.put_block(block.clone());
353
354    // Update highest QC
355    state.update_highest_qc(&justify);
356
357    // Fast-forward commit via double cert (if present).
358    // IMPORTANT: this MUST happen BEFORE the app_hash check below.
359    // block.app_hash = state-after-parent, and the DC in this proposal commits
360    // the parent block, producing that exact state root.  Processing the DC
361    // first keeps state.last_app_hash in sync so the check below is consistent
362    // for both the leader (who committed the parent independently) and replicas
363    // (who commit the parent only via this DC).
364    let mut pending_epoch = None;
365    let mut fast_forward_commit = None;
366    if let Some(ref dc) = double_cert {
367        match try_commit(
368            dc,
369            store,
370            app,
371            &mut state.last_committed_height,
372            &state.current_epoch,
373        ) {
374            Ok(result) => {
375                if !result.committed_blocks.is_empty() {
376                    state.last_app_hash = result.last_app_hash;
377                }
378                pending_epoch = result.pending_epoch.clone();
379                fast_forward_commit = Some(result);
380            }
381            Err(e) => {
382                return Err(eg!("try_commit failed during fast-forward: {}", e));
383            }
384        }
385    }
386
387    // Verify app_hash matches local state AFTER fast-forward commit.
388    // Skip when the application does not track state roots (e.g. fullnode
389    // running NoopApplication against a chain produced by a real ABCI app).
390    if app.tracks_app_hash() && block.app_hash != state.last_app_hash {
391        return Err(eg!(
392            "app_hash mismatch: block {} != local {}",
393            block.app_hash,
394            state.last_app_hash
395        ));
396    }
397
398    // Vote (first phase) → send to current leader (only if we have voting power)
399    if state.validator_set.power_of(state.validator_id) > 0 {
400        let vote_bytes = Vote::signing_bytes(
401            &state.chain_id_hash,
402            state.current_epoch.number,
403            state.current_view,
404            &block.hash,
405            VoteType::Vote,
406        );
407        let signature = signer.sign(&vote_bytes);
408        let vote = Vote {
409            block_hash: block.hash,
410            view: state.current_view,
411            validator: state.validator_id,
412            signature,
413            vote_type: VoteType::Vote,
414            extension: None,
415        };
416
417        let leader_id = state
418            .validator_set
419            .leader_for_view(state.current_view)
420            .expect("empty validator set")
421            .id;
422        info!(
423            validator = %state.validator_id,
424            view = %state.current_view,
425            hash = %block.hash,
426            "voting for block"
427        );
428        network.send_to(leader_id, ConsensusMessage::VoteMsg(vote));
429    }
430
431    state.step = ViewStep::Voted;
432    Ok(ProposalResult {
433        pending_epoch,
434        commit_result: fast_forward_commit,
435    })
436}
437
438/// Execute step (4): Leader collected 2f+1 votes → form QC → broadcast prepare
439pub fn on_votes_collected(
440    state: &mut ConsensusState,
441    qc: QuorumCertificate,
442    network: &dyn NetworkSink,
443    signer: &dyn Signer,
444) {
445    info!(
446        validator = %state.validator_id,
447        view = %state.current_view,
448        hash = %qc.block_hash,
449        "QC formed, broadcasting prepare"
450    );
451
452    state.update_highest_qc(&qc);
453
454    let msg_bytes = prepare_signing_bytes(&state.chain_id_hash, state.current_epoch.number, &qc);
455    let signature = signer.sign(&msg_bytes);
456
457    network.broadcast(ConsensusMessage::Prepare {
458        certificate: qc,
459        signature,
460    });
461
462    state.step = ViewStep::Prepared;
463}
464
465/// Execute step (5): Replica receives prepare → update lock → send vote2 to next leader
466///
467/// `vote_extension` is an optional ABCI++ vote extension to attach to the
468/// Vote2 message.  The caller (engine) is responsible for generating the
469/// extension via `Application::extend_vote` before invoking this function.
470pub fn on_prepare(
471    state: &mut ConsensusState,
472    qc: QuorumCertificate,
473    network: &dyn NetworkSink,
474    signer: &dyn Signer,
475    vote_extension: Option<Vec<u8>>,
476) {
477    // C-1: Guard — only accept Prepare if we have already voted (sent Vote1).
478    // Without this check, a Byzantine leader could cause us to lock a QC and
479    // emit Vote2 for a block we never validated in the first phase.
480    if state.step != ViewStep::Voted {
481        debug!(
482            validator = %state.validator_id,
483            step = ?state.step,
484            view = %state.current_view,
485            "ignoring prepare: not in Voted step"
486        );
487        return;
488    }
489
490    // Update lock to this QC
491    state.update_locked_qc(&qc);
492    state.update_highest_qc(&qc);
493
494    // Vote2 → send to next leader (only if we have voting power)
495    if state.validator_set.power_of(state.validator_id) > 0 {
496        let vote_bytes = Vote::signing_bytes(
497            &state.chain_id_hash,
498            state.current_epoch.number,
499            state.current_view,
500            &qc.block_hash,
501            VoteType::Vote2,
502        );
503        let signature = signer.sign(&vote_bytes);
504        let vote = Vote {
505            block_hash: qc.block_hash,
506            view: state.current_view,
507            validator: state.validator_id,
508            signature,
509            vote_type: VoteType::Vote2,
510            extension: vote_extension,
511        };
512
513        let next_leader_id = leader::next_leader(&state.validator_set, state.current_view);
514        info!(
515            validator = %state.validator_id,
516            view = %state.current_view,
517            hash = %qc.block_hash,
518            "sending vote2 to next leader {}",
519            next_leader_id
520        );
521        network.send_to(next_leader_id, ConsensusMessage::Vote2Msg(vote));
522    }
523
524    state.step = ViewStep::SentVote2;
525}
526
527// --- Signing helpers ---
528
529pub(crate) fn status_signing_bytes(
530    chain_id_hash: &[u8; 32],
531    epoch: EpochNumber,
532    view: ViewNumber,
533    locked_qc: &Option<QuorumCertificate>,
534) -> Vec<u8> {
535    let tag = b"HOTMINT_STATUS_V1\0";
536    let mut buf = Vec::with_capacity(tag.len() + 32 + 8 + 8 + 40);
537    buf.extend_from_slice(tag);
538    buf.extend_from_slice(chain_id_hash);
539    buf.extend_from_slice(&epoch.as_u64().to_le_bytes());
540    buf.extend_from_slice(&view.as_u64().to_le_bytes());
541    if let Some(qc) = locked_qc {
542        buf.extend_from_slice(&qc.block_hash.0);
543        buf.extend_from_slice(&qc.view.as_u64().to_le_bytes());
544    }
545    buf
546}
547
548pub(crate) fn proposal_signing_bytes(
549    chain_id_hash: &[u8; 32],
550    epoch: EpochNumber,
551    block: &Block,
552    justify: &QuorumCertificate,
553) -> Vec<u8> {
554    let tag = b"HOTMINT_PROPOSAL_V1\0";
555    let mut buf = Vec::with_capacity(tag.len() + 32 + 8 + 32 + 32 + 8);
556    buf.extend_from_slice(tag);
557    buf.extend_from_slice(chain_id_hash);
558    buf.extend_from_slice(&epoch.as_u64().to_le_bytes());
559    buf.extend_from_slice(&block.hash.0);
560    buf.extend_from_slice(&justify.block_hash.0);
561    buf.extend_from_slice(&justify.view.as_u64().to_le_bytes());
562    buf
563}
564
565pub(crate) fn prepare_signing_bytes(
566    chain_id_hash: &[u8; 32],
567    epoch: EpochNumber,
568    qc: &QuorumCertificate,
569) -> Vec<u8> {
570    let tag = b"HOTMINT_PREPARE_V1\0";
571    let mut buf = Vec::with_capacity(tag.len() + 32 + 8 + 32 + 8);
572    buf.extend_from_slice(tag);
573    buf.extend_from_slice(chain_id_hash);
574    buf.extend_from_slice(&epoch.as_u64().to_le_bytes());
575    buf.extend_from_slice(&qc.block_hash.0);
576    buf.extend_from_slice(&qc.view.as_u64().to_le_bytes());
577    buf
578}