Skip to main content

hotmint_consensus/
view_protocol.rs

1use ruc::*;
2
3use crate::application::Application;
4use crate::commit::{CommitResult, try_commit};
5use crate::leader;
6use crate::network::NetworkSink;
7use crate::state::{ConsensusState, ViewRole, ViewStep};
8use crate::store::BlockStore;
9use hotmint_crypto::hash::compute_block_hash;
10use hotmint_types::context::BlockContext;
11use hotmint_types::epoch::{Epoch, EpochNumber};
12use hotmint_types::evidence::EquivocationProof;
13use hotmint_types::vote::VoteType;
14use hotmint_types::*;
15use tracing::{debug, info, warn};
16
17/// Trigger that causes us to enter a new view
18pub enum ViewEntryTrigger {
19    DoubleCert(DoubleCertificate),
20    TimeoutCert(TimeoutCertificate),
21    Genesis,
22}
23
24/// Execute step (1): Enter view
25pub fn enter_view(
26    state: &mut ConsensusState,
27    view: ViewNumber,
28    trigger: ViewEntryTrigger,
29    network: &dyn NetworkSink,
30    signer: &dyn Signer,
31) {
32    state.current_view = view;
33    state.step = ViewStep::Entered;
34
35    let am_leader = leader::is_leader(&state.validator_set, view, state.validator_id);
36    state.role = if am_leader {
37        ViewRole::Leader
38    } else {
39        ViewRole::Replica
40    };
41
42    info!(
43        validator = %state.validator_id,
44        view = %view,
45        role = ?state.role,
46        epoch = %state.current_epoch.number,
47        "entering view"
48    );
49
50    match trigger {
51        ViewEntryTrigger::Genesis => {
52            if am_leader {
53                // Genesis leader enters WaitingForStatus; engine calls try_propose() directly
54                state.step = ViewStep::WaitingForStatus;
55            } else {
56                state.step = ViewStep::WaitingForProposal;
57            }
58        }
59        ViewEntryTrigger::DoubleCert(dc) => {
60            state.update_highest_qc(&dc.outer_qc);
61            state.highest_double_cert = Some(dc);
62            if am_leader {
63                state.step = ViewStep::WaitingForStatus;
64            } else {
65                // Send status to new leader
66                let leader_id = state
67                    .validator_set
68                    .leader_for_view(view)
69                    .expect("empty validator set")
70                    .id;
71                let msg_bytes = status_signing_bytes(
72                    &state.chain_id_hash,
73                    state.current_epoch.number,
74                    view,
75                    &state.locked_qc,
76                );
77                let sig = signer.sign(&msg_bytes);
78                network.send_to(
79                    leader_id,
80                    ConsensusMessage::StatusCert {
81                        locked_qc: state.locked_qc.clone(),
82                        validator: state.validator_id,
83                        signature: sig,
84                    },
85                );
86                state.step = ViewStep::WaitingForProposal;
87            }
88        }
89        ViewEntryTrigger::TimeoutCert(tc) => {
90            if let Some(hqc) = tc.highest_qc() {
91                state.update_highest_qc(hqc);
92            }
93            if am_leader {
94                state.step = ViewStep::WaitingForStatus;
95            } else {
96                let leader_id = state
97                    .validator_set
98                    .leader_for_view(view)
99                    .expect("empty validator set")
100                    .id;
101                let msg_bytes = status_signing_bytes(
102                    &state.chain_id_hash,
103                    state.current_epoch.number,
104                    view,
105                    &state.locked_qc,
106                );
107                let sig = signer.sign(&msg_bytes);
108                network.send_to(
109                    leader_id,
110                    ConsensusMessage::StatusCert {
111                        locked_qc: state.locked_qc.clone(),
112                        validator: state.validator_id,
113                        signature: sig,
114                    },
115                );
116                state.step = ViewStep::WaitingForProposal;
117            }
118        }
119    }
120}
121
122/// Execute step (2): Leader proposes
123pub fn propose(
124    state: &mut ConsensusState,
125    store: &mut dyn BlockStore,
126    network: &dyn NetworkSink,
127    app: &dyn Application,
128    signer: &dyn Signer,
129    evidence: Vec<EquivocationProof>,
130) -> Result<Block> {
131    let justify = state
132        .highest_qc
133        .clone()
134        .c(d!("no QC to justify proposal"))?;
135
136    let parent_hash = justify.block_hash;
137    let parent = store
138        .get_block(&parent_hash)
139        .c(d!("parent block not found"))?;
140    let height = parent.height.next();
141
142    let ctx = BlockContext {
143        height,
144        view: state.current_view,
145        proposer: state.validator_id,
146        epoch: state.current_epoch.number,
147        epoch_start_view: state.current_epoch.start_view,
148        validator_set: &state.validator_set,
149        vote_extensions: std::mem::take(&mut state.pending_vote_extensions),
150    };
151
152    let payload = app.create_payload(&ctx);
153
154    if !evidence.is_empty() {
155        info!(
156            validator = %state.validator_id,
157            count = evidence.len(),
158            "embedding equivocation evidence in block"
159        );
160    }
161
162    // BFT Time: proposer sets current Unix timestamp in milliseconds.
163    let timestamp = std::time::SystemTime::now()
164        .duration_since(std::time::UNIX_EPOCH)
165        .unwrap_or_default()
166        .as_millis() as u64;
167
168    let mut block = Block {
169        height,
170        parent_hash,
171        view: state.current_view,
172        proposer: state.validator_id,
173        timestamp,
174        payload,
175        app_hash: state.last_app_hash,
176        evidence,
177        hash: BlockHash::GENESIS, // placeholder
178    };
179    block.hash = compute_block_hash(&block);
180
181    store.put_block(block.clone());
182
183    let msg_bytes = proposal_signing_bytes(
184        &state.chain_id_hash,
185        state.current_epoch.number,
186        &block,
187        &justify,
188    );
189    let signature = signer.sign(&msg_bytes);
190
191    info!(
192        validator = %state.validator_id,
193        view = %state.current_view,
194        height = height.as_u64(),
195        hash = %block.hash,
196        "proposing block"
197    );
198
199    network.broadcast(ConsensusMessage::Propose {
200        block: Box::new(block.clone()),
201        justify: Box::new(justify),
202        double_cert: state.highest_double_cert.clone().map(Box::new),
203        signature,
204    });
205
206    state.step = ViewStep::CollectingVotes;
207    Ok(block)
208}
209
210/// Incoming proposal data from the leader.
211pub struct ProposalData {
212    pub block: Block,
213    pub justify: QuorumCertificate,
214    pub double_cert: Option<DoubleCertificate>,
215}
216
217/// Execute step (3): Replica receives proposal, validates, votes.
218/// Returns `Option<Epoch>` if fast-forward commit triggered an epoch change.
219/// Result of on_proposal that may include a fast-forward commit.
220pub struct ProposalResult {
221    /// Pending epoch from the fast-forward commit (if any).
222    pub pending_epoch: Option<Epoch>,
223    /// CommitResult from the fast-forward DC commit (if any).
224    /// The engine must process this for WAL, tx indexing, evidence marking, etc.
225    pub commit_result: Option<CommitResult>,
226}
227
228pub fn on_proposal(
229    state: &mut ConsensusState,
230    proposal: ProposalData,
231    store: &mut dyn BlockStore,
232    network: &dyn NetworkSink,
233    app: &dyn Application,
234    signer: &dyn Signer,
235) -> Result<ProposalResult> {
236    let ProposalData {
237        block,
238        justify,
239        double_cert,
240    } = proposal;
241    if state.step != ViewStep::WaitingForProposal {
242        debug!(
243            validator = %state.validator_id,
244            step = ?state.step,
245            "ignoring proposal, not waiting"
246        );
247        return Ok(ProposalResult {
248            pending_epoch: None,
249            commit_result: None,
250        });
251    }
252
253    // Safety check: justify.rank() >= locked_qc.rank()
254    if let Some(ref locked) = state.locked_qc
255        && justify.rank() < locked.rank()
256    {
257        warn!(
258            validator = %state.validator_id,
259            justify_view = %justify.view,
260            locked_view = %locked.view,
261            "rejecting proposal: justify rank < locked rank"
262        );
263        return Err(eg!("proposal justify rank below locked QC rank"));
264    }
265
266    // Verify proposer is the rightful leader for this view
267    let expected_leader = state
268        .validator_set
269        .leader_for_view(block.view)
270        .ok_or_else(|| eg!("empty validator set"))?
271        .id;
272    if block.proposer != expected_leader {
273        return Err(eg!(
274            "block proposer {} is not leader {} for view {}",
275            block.proposer,
276            expected_leader,
277            block.view
278        ));
279    }
280
281    // Verify block hash integrity
282    let expected_hash = hotmint_crypto::compute_block_hash(&block);
283    if block.hash != expected_hash {
284        return Err(eg!(
285            "block hash mismatch: declared {} != computed {}",
286            block.hash,
287            expected_hash
288        ));
289    }
290
291    // C-4: Verify block extends the chain certified by the justify QC.
292    // The genesis QC certifies BlockHash::GENESIS, and the first block's
293    // parent is also GENESIS, so this check is correct for height 1 as well.
294    if block.parent_hash != justify.block_hash {
295        return Err(eg!(
296            "block parent_hash {} does not match justify block_hash {}",
297            block.parent_hash,
298            justify.block_hash
299        ));
300    }
301
302    // BFT Time: verify the block timestamp is monotonically non-decreasing
303    // and within a reasonable drift window of the local clock.
304    // Skip only for genesis (height 0); height 1+ MUST have valid timestamps.
305    if block.height.as_u64() > 0 {
306        if block.timestamp == 0 {
307            return Err(eg!("non-genesis block has timestamp 0"));
308        }
309        // Check monotonicity against parent block (if available).
310        if let Some(parent) = store.get_block(&block.parent_hash)
311            && block.timestamp < parent.timestamp
312        {
313            return Err(eg!(
314                "block timestamp {} < parent timestamp {}",
315                block.timestamp,
316                parent.timestamp
317            ));
318        }
319        // Allow up to 15 seconds of clock drift into the future.
320        let now_ms = std::time::SystemTime::now()
321            .duration_since(std::time::UNIX_EPOCH)
322            .unwrap_or_default()
323            .as_millis() as u64;
324        const MAX_FUTURE_DRIFT_MS: u64 = 15_000;
325        if block.timestamp > now_ms + MAX_FUTURE_DRIFT_MS {
326            return Err(eg!(
327                "block timestamp {} is too far in the future (local: {})",
328                block.timestamp,
329                now_ms
330            ));
331        }
332    }
333
334    let ctx = BlockContext {
335        height: block.height,
336        view: block.view,
337        proposer: block.proposer,
338        epoch: state.current_epoch.number,
339        epoch_start_view: state.current_epoch.start_view,
340        validator_set: &state.validator_set,
341        vote_extensions: vec![],
342    };
343
344    if !app.validate_block(&block, &ctx) {
345        return Err(eg!("application rejected block"));
346    }
347
348    // Store the block
349    store.put_block(block.clone());
350
351    // Update highest QC
352    state.update_highest_qc(&justify);
353
354    // Fast-forward commit via double cert (if present).
355    // IMPORTANT: this MUST happen BEFORE the app_hash check below.
356    // block.app_hash = state-after-parent, and the DC in this proposal commits
357    // the parent block, producing that exact state root.  Processing the DC
358    // first keeps state.last_app_hash in sync so the check below is consistent
359    // for both the leader (who committed the parent independently) and replicas
360    // (who commit the parent only via this DC).
361    let mut pending_epoch = None;
362    let mut fast_forward_commit = None;
363    if let Some(ref dc) = double_cert {
364        match try_commit(
365            dc,
366            store,
367            app,
368            &mut state.last_committed_height,
369            &state.current_epoch,
370        ) {
371            Ok(result) => {
372                if !result.committed_blocks.is_empty() {
373                    state.last_app_hash = result.last_app_hash;
374                }
375                pending_epoch = result.pending_epoch.clone();
376                fast_forward_commit = Some(result);
377            }
378            Err(e) => {
379                return Err(eg!("try_commit failed during fast-forward: {}", e));
380            }
381        }
382    }
383
384    // Verify app_hash matches local state AFTER fast-forward commit.
385    // Skip when the application does not track state roots (e.g. fullnode
386    // running NoopApplication against a chain produced by a real ABCI app).
387    if app.tracks_app_hash() && block.app_hash != state.last_app_hash {
388        return Err(eg!(
389            "app_hash mismatch: block {} != local {}",
390            block.app_hash,
391            state.last_app_hash
392        ));
393    }
394
395    // Vote (first phase) → send to current leader (only if we have voting power)
396    if state.validator_set.power_of(state.validator_id) > 0 {
397        let vote_bytes = Vote::signing_bytes(
398            &state.chain_id_hash,
399            state.current_epoch.number,
400            state.current_view,
401            &block.hash,
402            VoteType::Vote,
403        );
404        let signature = signer.sign(&vote_bytes);
405        let vote = Vote {
406            block_hash: block.hash,
407            view: state.current_view,
408            validator: state.validator_id,
409            signature,
410            vote_type: VoteType::Vote,
411            extension: None,
412        };
413
414        let leader_id = state
415            .validator_set
416            .leader_for_view(state.current_view)
417            .expect("empty validator set")
418            .id;
419        info!(
420            validator = %state.validator_id,
421            view = %state.current_view,
422            hash = %block.hash,
423            "voting for block"
424        );
425        network.send_to(leader_id, ConsensusMessage::VoteMsg(vote));
426    }
427
428    state.step = ViewStep::Voted;
429    Ok(ProposalResult {
430        pending_epoch,
431        commit_result: fast_forward_commit,
432    })
433}
434
435/// Execute step (4): Leader collected 2f+1 votes → form QC → broadcast prepare
436pub fn on_votes_collected(
437    state: &mut ConsensusState,
438    qc: QuorumCertificate,
439    network: &dyn NetworkSink,
440    signer: &dyn Signer,
441) {
442    info!(
443        validator = %state.validator_id,
444        view = %state.current_view,
445        hash = %qc.block_hash,
446        "QC formed, broadcasting prepare"
447    );
448
449    state.update_highest_qc(&qc);
450
451    let msg_bytes = prepare_signing_bytes(&state.chain_id_hash, state.current_epoch.number, &qc);
452    let signature = signer.sign(&msg_bytes);
453
454    network.broadcast(ConsensusMessage::Prepare {
455        certificate: qc,
456        signature,
457    });
458
459    state.step = ViewStep::Prepared;
460}
461
462/// Execute step (5): Replica receives prepare → update lock → send vote2 to next leader
463///
464/// `vote_extension` is an optional ABCI++ vote extension to attach to the
465/// Vote2 message.  The caller (engine) is responsible for generating the
466/// extension via `Application::extend_vote` before invoking this function.
467pub fn on_prepare(
468    state: &mut ConsensusState,
469    qc: QuorumCertificate,
470    network: &dyn NetworkSink,
471    signer: &dyn Signer,
472    vote_extension: Option<Vec<u8>>,
473) {
474    // Update lock to this QC
475    state.update_locked_qc(&qc);
476    state.update_highest_qc(&qc);
477
478    // Vote2 → send to next leader (only if we have voting power)
479    if state.validator_set.power_of(state.validator_id) > 0 {
480        let vote_bytes = Vote::signing_bytes(
481            &state.chain_id_hash,
482            state.current_epoch.number,
483            state.current_view,
484            &qc.block_hash,
485            VoteType::Vote2,
486        );
487        let signature = signer.sign(&vote_bytes);
488        let vote = Vote {
489            block_hash: qc.block_hash,
490            view: state.current_view,
491            validator: state.validator_id,
492            signature,
493            vote_type: VoteType::Vote2,
494            extension: vote_extension,
495        };
496
497        let next_leader_id = leader::next_leader(&state.validator_set, state.current_view);
498        info!(
499            validator = %state.validator_id,
500            view = %state.current_view,
501            hash = %qc.block_hash,
502            "sending vote2 to next leader {}",
503            next_leader_id
504        );
505        network.send_to(next_leader_id, ConsensusMessage::Vote2Msg(vote));
506    }
507
508    state.step = ViewStep::SentVote2;
509}
510
511// --- Signing helpers ---
512
513pub(crate) fn status_signing_bytes(
514    chain_id_hash: &[u8; 32],
515    epoch: EpochNumber,
516    view: ViewNumber,
517    locked_qc: &Option<QuorumCertificate>,
518) -> Vec<u8> {
519    let tag = b"HOTMINT_STATUS_V1\0";
520    let mut buf = Vec::with_capacity(tag.len() + 32 + 8 + 8 + 40);
521    buf.extend_from_slice(tag);
522    buf.extend_from_slice(chain_id_hash);
523    buf.extend_from_slice(&epoch.as_u64().to_le_bytes());
524    buf.extend_from_slice(&view.as_u64().to_le_bytes());
525    if let Some(qc) = locked_qc {
526        buf.extend_from_slice(&qc.block_hash.0);
527        buf.extend_from_slice(&qc.view.as_u64().to_le_bytes());
528    }
529    buf
530}
531
532pub(crate) fn proposal_signing_bytes(
533    chain_id_hash: &[u8; 32],
534    epoch: EpochNumber,
535    block: &Block,
536    justify: &QuorumCertificate,
537) -> Vec<u8> {
538    let tag = b"HOTMINT_PROPOSAL_V1\0";
539    let mut buf = Vec::with_capacity(tag.len() + 32 + 8 + 32 + 32 + 8);
540    buf.extend_from_slice(tag);
541    buf.extend_from_slice(chain_id_hash);
542    buf.extend_from_slice(&epoch.as_u64().to_le_bytes());
543    buf.extend_from_slice(&block.hash.0);
544    buf.extend_from_slice(&justify.block_hash.0);
545    buf.extend_from_slice(&justify.view.as_u64().to_le_bytes());
546    buf
547}
548
549pub(crate) fn prepare_signing_bytes(
550    chain_id_hash: &[u8; 32],
551    epoch: EpochNumber,
552    qc: &QuorumCertificate,
553) -> Vec<u8> {
554    let tag = b"HOTMINT_PREPARE_V1\0";
555    let mut buf = Vec::with_capacity(tag.len() + 32 + 8 + 32 + 8);
556    buf.extend_from_slice(tag);
557    buf.extend_from_slice(chain_id_hash);
558    buf.extend_from_slice(&epoch.as_u64().to_le_bytes());
559    buf.extend_from_slice(&qc.block_hash.0);
560    buf.extend_from_slice(&qc.view.as_u64().to_le_bytes());
561    buf
562}