Skip to main content

hotmint_consensus/
view_protocol.rs

1use ruc::*;
2
3use crate::application::Application;
4use crate::commit::try_commit;
5use crate::leader;
6use crate::network::NetworkSink;
7use crate::state::{ConsensusState, ViewRole, ViewStep};
8use crate::store::BlockStore;
9use hotmint_crypto::hash::hash_block;
10use hotmint_types::context::BlockContext;
11use hotmint_types::epoch::Epoch;
12use hotmint_types::vote::VoteType;
13use hotmint_types::*;
14use tracing::{debug, info, warn};
15
16/// Trigger that causes us to enter a new view
17pub enum ViewEntryTrigger {
18    DoubleCert(DoubleCertificate),
19    TimeoutCert(TimeoutCertificate),
20    Genesis,
21}
22
23/// Execute step (1): Enter view
24pub fn enter_view(
25    state: &mut ConsensusState,
26    view: ViewNumber,
27    trigger: ViewEntryTrigger,
28    network: &dyn NetworkSink,
29    signer: &dyn Signer,
30) {
31    state.current_view = view;
32    state.step = ViewStep::Entered;
33
34    let am_leader = leader::is_leader(&state.validator_set, view, state.validator_id);
35    state.role = if am_leader {
36        ViewRole::Leader
37    } else {
38        ViewRole::Replica
39    };
40
41    info!(
42        validator = %state.validator_id,
43        view = %view,
44        role = ?state.role,
45        epoch = %state.current_epoch.number,
46        "entering view"
47    );
48
49    match trigger {
50        ViewEntryTrigger::Genesis => {
51            if am_leader {
52                // Genesis leader enters WaitingForStatus; engine calls try_propose() directly
53                state.step = ViewStep::WaitingForStatus;
54            } else {
55                state.step = ViewStep::WaitingForProposal;
56            }
57        }
58        ViewEntryTrigger::DoubleCert(dc) => {
59            state.update_highest_qc(&dc.outer_qc);
60            state.highest_double_cert = Some(dc);
61            if am_leader {
62                state.step = ViewStep::WaitingForStatus;
63            } else {
64                // Send status to new leader
65                let leader_id = state.validator_set.leader_for_view(view).id;
66                let msg_bytes = status_signing_bytes(view, &state.locked_qc);
67                let sig = signer.sign(&msg_bytes);
68                network.send_to(
69                    leader_id,
70                    ConsensusMessage::StatusCert {
71                        locked_qc: state.locked_qc.clone(),
72                        validator: state.validator_id,
73                        signature: sig,
74                    },
75                );
76                state.step = ViewStep::WaitingForProposal;
77            }
78        }
79        ViewEntryTrigger::TimeoutCert(tc) => {
80            if let Some(hqc) = tc.highest_qc() {
81                state.update_highest_qc(hqc);
82            }
83            if am_leader {
84                state.step = ViewStep::WaitingForStatus;
85            } else {
86                let leader_id = state.validator_set.leader_for_view(view).id;
87                let msg_bytes = status_signing_bytes(view, &state.locked_qc);
88                let sig = signer.sign(&msg_bytes);
89                network.send_to(
90                    leader_id,
91                    ConsensusMessage::StatusCert {
92                        locked_qc: state.locked_qc.clone(),
93                        validator: state.validator_id,
94                        signature: sig,
95                    },
96                );
97                state.step = ViewStep::WaitingForProposal;
98            }
99        }
100    }
101}
102
103/// Execute step (2): Leader proposes
104pub fn propose(
105    state: &mut ConsensusState,
106    store: &mut dyn BlockStore,
107    network: &dyn NetworkSink,
108    app: &dyn Application,
109    signer: &dyn Signer,
110) -> Result<Block> {
111    let justify = state
112        .highest_qc
113        .clone()
114        .c(d!("no QC to justify proposal"))?;
115
116    let parent_hash = justify.block_hash;
117    let parent = store
118        .get_block(&parent_hash)
119        .c(d!("parent block not found"))?;
120    let height = parent.height.next();
121
122    let ctx = BlockContext {
123        height,
124        view: state.current_view,
125        proposer: state.validator_id,
126        epoch: state.current_epoch.number,
127        epoch_start_view: state.current_epoch.start_view,
128        validator_set: &state.validator_set,
129    };
130
131    let payload = app.create_payload(&ctx);
132
133    let mut block = Block {
134        height,
135        parent_hash,
136        view: state.current_view,
137        proposer: state.validator_id,
138        payload,
139        hash: BlockHash::GENESIS, // placeholder
140    };
141    block.hash = hash_block(&block);
142
143    store.put_block(block.clone());
144
145    let msg_bytes = proposal_signing_bytes(&block, &justify);
146    let signature = signer.sign(&msg_bytes);
147
148    info!(
149        validator = %state.validator_id,
150        view = %state.current_view,
151        height = height.as_u64(),
152        hash = %block.hash,
153        "proposing block"
154    );
155
156    network.broadcast(ConsensusMessage::Propose {
157        block: Box::new(block.clone()),
158        justify: Box::new(justify),
159        double_cert: state.highest_double_cert.clone().map(Box::new),
160        signature,
161    });
162
163    state.step = ViewStep::CollectingVotes;
164    Ok(block)
165}
166
167/// Incoming proposal data from the leader.
168pub struct ProposalData {
169    pub block: Block,
170    pub justify: QuorumCertificate,
171    pub double_cert: Option<DoubleCertificate>,
172}
173
174/// Execute step (3): Replica receives proposal, validates, votes.
175/// Returns `Option<Epoch>` if fast-forward commit triggered an epoch change.
176pub fn on_proposal(
177    state: &mut ConsensusState,
178    proposal: ProposalData,
179    store: &mut dyn BlockStore,
180    network: &dyn NetworkSink,
181    app: &dyn Application,
182    signer: &dyn Signer,
183) -> Result<Option<Epoch>> {
184    let ProposalData {
185        block,
186        justify,
187        double_cert,
188    } = proposal;
189    if state.step != ViewStep::WaitingForProposal {
190        debug!(
191            validator = %state.validator_id,
192            step = ?state.step,
193            "ignoring proposal, not waiting"
194        );
195        return Ok(None);
196    }
197
198    // Safety check: justify.rank() >= locked_qc.rank()
199    if let Some(ref locked) = state.locked_qc
200        && justify.rank() < locked.rank()
201    {
202        warn!(
203            validator = %state.validator_id,
204            justify_view = %justify.view,
205            locked_view = %locked.view,
206            "rejecting proposal: justify rank < locked rank"
207        );
208        return Err(eg!("proposal justify rank below locked QC rank"));
209    }
210
211    // Verify proposer is the rightful leader for this view
212    let expected_leader = state.validator_set.leader_for_view(block.view).id;
213    if block.proposer != expected_leader {
214        return Err(eg!(
215            "block proposer {} is not leader {} for view {}",
216            block.proposer,
217            expected_leader,
218            block.view
219        ));
220    }
221
222    // Verify block hash integrity
223    let expected_hash = hotmint_crypto::hash_block(&block);
224    if block.hash != expected_hash {
225        return Err(eg!(
226            "block hash mismatch: declared {} != computed {}",
227            block.hash,
228            expected_hash
229        ));
230    }
231
232    let ctx = BlockContext {
233        height: block.height,
234        view: block.view,
235        proposer: block.proposer,
236        epoch: state.current_epoch.number,
237        epoch_start_view: state.current_epoch.start_view,
238        validator_set: &state.validator_set,
239    };
240
241    if !app.validate_block(&block, &ctx) {
242        return Err(eg!("application rejected block"));
243    }
244
245    // Store the block
246    store.put_block(block.clone());
247
248    // Update highest QC
249    state.update_highest_qc(&justify);
250
251    // Try commit if double cert present (fast-forward)
252    let mut pending_epoch = None;
253    if let Some(ref dc) = double_cert {
254        match try_commit(
255            dc,
256            store,
257            app,
258            &mut state.last_committed_height,
259            &state.current_epoch,
260        ) {
261            Ok(result) => {
262                pending_epoch = result.pending_epoch;
263            }
264            Err(e) => {
265                warn!(error = %e, "try_commit failed during fast-forward in on_proposal");
266            }
267        }
268    }
269
270    // Vote (first phase) → send to current leader
271    let vote_bytes = Vote::signing_bytes(state.current_view, &block.hash, VoteType::Vote);
272    let signature = signer.sign(&vote_bytes);
273    let vote = Vote {
274        block_hash: block.hash,
275        view: state.current_view,
276        validator: state.validator_id,
277        signature,
278        vote_type: VoteType::Vote,
279    };
280
281    let leader_id = state.validator_set.leader_for_view(state.current_view).id;
282    info!(
283        validator = %state.validator_id,
284        view = %state.current_view,
285        hash = %block.hash,
286        "voting for block"
287    );
288    network.send_to(leader_id, ConsensusMessage::VoteMsg(vote));
289
290    state.step = ViewStep::Voted;
291    Ok(pending_epoch)
292}
293
294/// Execute step (4): Leader collected 2f+1 votes → form QC → broadcast prepare
295pub fn on_votes_collected(
296    state: &mut ConsensusState,
297    qc: QuorumCertificate,
298    network: &dyn NetworkSink,
299    signer: &dyn Signer,
300) {
301    info!(
302        validator = %state.validator_id,
303        view = %state.current_view,
304        hash = %qc.block_hash,
305        "QC formed, broadcasting prepare"
306    );
307
308    state.update_highest_qc(&qc);
309
310    let msg_bytes = prepare_signing_bytes(&qc);
311    let signature = signer.sign(&msg_bytes);
312
313    network.broadcast(ConsensusMessage::Prepare {
314        certificate: qc,
315        signature,
316    });
317
318    state.step = ViewStep::Prepared;
319}
320
321/// Execute step (5): Replica receives prepare → update lock → send vote2 to next leader
322pub fn on_prepare(
323    state: &mut ConsensusState,
324    qc: QuorumCertificate,
325    network: &dyn NetworkSink,
326    signer: &dyn Signer,
327) {
328    // Update lock to this QC
329    state.update_locked_qc(&qc);
330    state.update_highest_qc(&qc);
331
332    // Vote2 → send to next leader
333    let vote_bytes = Vote::signing_bytes(state.current_view, &qc.block_hash, VoteType::Vote2);
334    let signature = signer.sign(&vote_bytes);
335    let vote = Vote {
336        block_hash: qc.block_hash,
337        view: state.current_view,
338        validator: state.validator_id,
339        signature,
340        vote_type: VoteType::Vote2,
341    };
342
343    let next_leader_id = leader::next_leader(&state.validator_set, state.current_view);
344    info!(
345        validator = %state.validator_id,
346        view = %state.current_view,
347        hash = %qc.block_hash,
348        "sending vote2 to next leader {}",
349        next_leader_id
350    );
351    network.send_to(next_leader_id, ConsensusMessage::Vote2Msg(vote));
352
353    state.step = ViewStep::SentVote2;
354}
355
356// --- Signing helpers ---
357
358pub(crate) fn status_signing_bytes(
359    view: ViewNumber,
360    locked_qc: &Option<QuorumCertificate>,
361) -> Vec<u8> {
362    let mut buf = Vec::new();
363    buf.push(b'S');
364    buf.extend_from_slice(&view.as_u64().to_le_bytes());
365    if let Some(qc) = locked_qc {
366        buf.extend_from_slice(&qc.block_hash.0);
367        buf.extend_from_slice(&qc.view.as_u64().to_le_bytes());
368    }
369    buf
370}
371
372pub(crate) fn proposal_signing_bytes(block: &Block, justify: &QuorumCertificate) -> Vec<u8> {
373    let mut buf = Vec::new();
374    buf.push(b'P');
375    buf.extend_from_slice(&block.hash.0);
376    buf.extend_from_slice(&justify.block_hash.0);
377    buf.extend_from_slice(&justify.view.as_u64().to_le_bytes());
378    buf
379}
380
381pub(crate) fn prepare_signing_bytes(qc: &QuorumCertificate) -> Vec<u8> {
382    let mut buf = Vec::new();
383    buf.push(b'Q');
384    buf.extend_from_slice(&qc.block_hash.0);
385    buf.extend_from_slice(&qc.view.as_u64().to_le_bytes());
386    buf
387}