Skip to main content

hotmint_consensus/
view_protocol.rs

1use ruc::*;
2
3use crate::application::Application;
4use crate::commit::try_commit;
5use crate::leader;
6use crate::network::NetworkSink;
7use crate::state::{ConsensusState, ViewRole, ViewStep};
8use crate::store::BlockStore;
9use hotmint_crypto::hash::hash_block;
10use hotmint_types::context::BlockContext;
11use hotmint_types::epoch::Epoch;
12use hotmint_types::vote::VoteType;
13use hotmint_types::*;
14use tracing::{debug, info, warn};
15
16/// Trigger that causes us to enter a new view
17pub enum ViewEntryTrigger {
18    DoubleCert(DoubleCertificate),
19    TimeoutCert(TimeoutCertificate),
20    Genesis,
21}
22
23/// Execute step (1): Enter view
24pub fn enter_view(
25    state: &mut ConsensusState,
26    view: ViewNumber,
27    trigger: ViewEntryTrigger,
28    network: &dyn NetworkSink,
29    signer: &dyn Signer,
30) {
31    state.current_view = view;
32    state.step = ViewStep::Entered;
33
34    let am_leader = leader::is_leader(&state.validator_set, view, state.validator_id);
35    state.role = if am_leader {
36        ViewRole::Leader
37    } else {
38        ViewRole::Replica
39    };
40
41    info!(
42        validator = %state.validator_id,
43        view = %view,
44        role = ?state.role,
45        epoch = %state.current_epoch.number,
46        "entering view"
47    );
48
49    match trigger {
50        ViewEntryTrigger::Genesis => {
51            if am_leader {
52                state.step = ViewStep::WaitingForStatus;
53                // In genesis, leader can propose immediately (no status to wait for)
54                state.step = ViewStep::Proposed; // will be set properly by propose()
55            } else {
56                state.step = ViewStep::WaitingForProposal;
57            }
58        }
59        ViewEntryTrigger::DoubleCert(dc) => {
60            state.update_highest_qc(&dc.outer_qc);
61            state.highest_double_cert = Some(dc);
62            if am_leader {
63                state.step = ViewStep::WaitingForStatus;
64            } else {
65                // Send status to new leader
66                let leader_id = state.validator_set.leader_for_view(view).id;
67                let msg_bytes = status_signing_bytes(view, &state.locked_qc);
68                let sig = signer.sign(&msg_bytes);
69                network.send_to(
70                    leader_id,
71                    ConsensusMessage::StatusCert {
72                        locked_qc: state.locked_qc.clone(),
73                        validator: state.validator_id,
74                        signature: sig,
75                    },
76                );
77                state.step = ViewStep::WaitingForProposal;
78            }
79        }
80        ViewEntryTrigger::TimeoutCert(tc) => {
81            if let Some(hqc) = tc.highest_qc() {
82                state.update_highest_qc(hqc);
83            }
84            if am_leader {
85                state.step = ViewStep::WaitingForStatus;
86            } else {
87                let leader_id = state.validator_set.leader_for_view(view).id;
88                let msg_bytes = status_signing_bytes(view, &state.locked_qc);
89                let sig = signer.sign(&msg_bytes);
90                network.send_to(
91                    leader_id,
92                    ConsensusMessage::StatusCert {
93                        locked_qc: state.locked_qc.clone(),
94                        validator: state.validator_id,
95                        signature: sig,
96                    },
97                );
98                state.step = ViewStep::WaitingForProposal;
99            }
100        }
101    }
102}
103
104/// Execute step (2): Leader proposes
105pub fn propose(
106    state: &mut ConsensusState,
107    store: &mut dyn BlockStore,
108    network: &dyn NetworkSink,
109    app: &dyn Application,
110    signer: &dyn Signer,
111) -> Result<Block> {
112    let justify = state
113        .highest_qc
114        .clone()
115        .c(d!("no QC to justify proposal"))?;
116
117    let parent_hash = justify.block_hash;
118    let parent = store
119        .get_block(&parent_hash)
120        .c(d!("parent block not found"))?;
121    let height = parent.height.next();
122
123    let ctx = BlockContext {
124        height,
125        view: state.current_view,
126        proposer: state.validator_id,
127        epoch: state.current_epoch.number,
128        validator_set: &state.validator_set,
129    };
130
131    let payload = app.create_payload(&ctx);
132
133    let mut block = Block {
134        height,
135        parent_hash,
136        view: state.current_view,
137        proposer: state.validator_id,
138        payload,
139        hash: BlockHash::GENESIS, // placeholder
140    };
141    block.hash = hash_block(&block);
142
143    store.put_block(block.clone());
144
145    let msg_bytes = proposal_signing_bytes(&block, &justify);
146    let signature = signer.sign(&msg_bytes);
147
148    info!(
149        validator = %state.validator_id,
150        view = %state.current_view,
151        height = height.as_u64(),
152        hash = %block.hash,
153        "proposing block"
154    );
155
156    network.broadcast(ConsensusMessage::Propose {
157        block: Box::new(block.clone()),
158        justify: Box::new(justify),
159        double_cert: state.highest_double_cert.clone().map(Box::new),
160        signature,
161    });
162
163    state.step = ViewStep::CollectingVotes;
164    Ok(block)
165}
166
167/// Execute step (3): Replica receives proposal, validates, votes.
168/// Returns `Option<Epoch>` if fast-forward commit triggered an epoch change.
169#[allow(clippy::too_many_arguments)]
170pub fn on_proposal(
171    state: &mut ConsensusState,
172    block: Block,
173    justify: QuorumCertificate,
174    double_cert: Option<DoubleCertificate>,
175    store: &mut dyn BlockStore,
176    network: &dyn NetworkSink,
177    app: &dyn Application,
178    signer: &dyn Signer,
179) -> Result<Option<Epoch>> {
180    if state.step != ViewStep::WaitingForProposal {
181        debug!(
182            validator = %state.validator_id,
183            step = ?state.step,
184            "ignoring proposal, not waiting"
185        );
186        return Ok(None);
187    }
188
189    // Safety check: justify.rank() >= locked_qc.rank()
190    if let Some(ref locked) = state.locked_qc
191        && justify.rank() < locked.rank()
192    {
193        warn!(
194            validator = %state.validator_id,
195            justify_view = %justify.view,
196            locked_view = %locked.view,
197            "rejecting proposal: justify rank < locked rank"
198        );
199        return Err(eg!("proposal justify rank below locked QC rank"));
200    }
201
202    let ctx = BlockContext {
203        height: block.height,
204        view: block.view,
205        proposer: block.proposer,
206        epoch: state.current_epoch.number,
207        validator_set: &state.validator_set,
208    };
209
210    if !app.validate_block(&block, &ctx) {
211        return Err(eg!("application rejected block"));
212    }
213
214    // Store the block
215    store.put_block(block.clone());
216
217    // Update highest QC
218    state.update_highest_qc(&justify);
219
220    // Try commit if double cert present (fast-forward)
221    let mut pending_epoch = None;
222    if let Some(ref dc) = double_cert {
223        match try_commit(
224            dc,
225            store,
226            app,
227            &mut state.last_committed_height,
228            &state.current_epoch,
229        ) {
230            Ok(result) => {
231                pending_epoch = result.pending_epoch;
232            }
233            Err(e) => {
234                warn!(error = %e, "try_commit failed during fast-forward in on_proposal");
235            }
236        }
237    }
238
239    // Vote (first phase) → send to current leader
240    let vote_bytes = Vote::signing_bytes(state.current_view, &block.hash, VoteType::Vote);
241    let signature = signer.sign(&vote_bytes);
242    let vote = Vote {
243        block_hash: block.hash,
244        view: state.current_view,
245        validator: state.validator_id,
246        signature,
247        vote_type: VoteType::Vote,
248    };
249
250    let leader_id = state.validator_set.leader_for_view(state.current_view).id;
251    info!(
252        validator = %state.validator_id,
253        view = %state.current_view,
254        hash = %block.hash,
255        "voting for block"
256    );
257    network.send_to(leader_id, ConsensusMessage::VoteMsg(vote));
258
259    state.step = ViewStep::Voted;
260    Ok(pending_epoch)
261}
262
263/// Execute step (4): Leader collected 2f+1 votes → form QC → broadcast prepare
264pub fn on_votes_collected(
265    state: &mut ConsensusState,
266    qc: QuorumCertificate,
267    network: &dyn NetworkSink,
268    signer: &dyn Signer,
269) {
270    info!(
271        validator = %state.validator_id,
272        view = %state.current_view,
273        hash = %qc.block_hash,
274        "QC formed, broadcasting prepare"
275    );
276
277    state.update_highest_qc(&qc);
278
279    let msg_bytes = prepare_signing_bytes(&qc);
280    let signature = signer.sign(&msg_bytes);
281
282    network.broadcast(ConsensusMessage::Prepare {
283        certificate: qc,
284        signature,
285    });
286
287    state.step = ViewStep::Prepared;
288}
289
290/// Execute step (5): Replica receives prepare → update lock → send vote2 to next leader
291pub fn on_prepare(
292    state: &mut ConsensusState,
293    qc: QuorumCertificate,
294    network: &dyn NetworkSink,
295    signer: &dyn Signer,
296) {
297    // Update lock to this QC
298    state.update_locked_qc(&qc);
299    state.update_highest_qc(&qc);
300
301    // Vote2 → send to next leader
302    let vote_bytes = Vote::signing_bytes(state.current_view, &qc.block_hash, VoteType::Vote2);
303    let signature = signer.sign(&vote_bytes);
304    let vote = Vote {
305        block_hash: qc.block_hash,
306        view: state.current_view,
307        validator: state.validator_id,
308        signature,
309        vote_type: VoteType::Vote2,
310    };
311
312    let next_leader_id = leader::next_leader(&state.validator_set, state.current_view);
313    info!(
314        validator = %state.validator_id,
315        view = %state.current_view,
316        hash = %qc.block_hash,
317        "sending vote2 to next leader {}",
318        next_leader_id
319    );
320    network.send_to(next_leader_id, ConsensusMessage::Vote2Msg(vote));
321
322    state.step = ViewStep::SentVote2;
323}
324
325// --- Signing helpers ---
326
327fn status_signing_bytes(view: ViewNumber, locked_qc: &Option<QuorumCertificate>) -> Vec<u8> {
328    let mut buf = Vec::new();
329    buf.push(b'S');
330    buf.extend_from_slice(&view.as_u64().to_le_bytes());
331    if let Some(qc) = locked_qc {
332        buf.extend_from_slice(&qc.block_hash.0);
333        buf.extend_from_slice(&qc.view.as_u64().to_le_bytes());
334    }
335    buf
336}
337
338fn proposal_signing_bytes(block: &Block, justify: &QuorumCertificate) -> Vec<u8> {
339    let mut buf = Vec::new();
340    buf.push(b'P');
341    buf.extend_from_slice(&block.hash.0);
342    buf.extend_from_slice(&justify.block_hash.0);
343    buf.extend_from_slice(&justify.view.as_u64().to_le_bytes());
344    buf
345}
346
347fn prepare_signing_bytes(qc: &QuorumCertificate) -> Vec<u8> {
348    let mut buf = Vec::new();
349    buf.push(b'Q');
350    buf.extend_from_slice(&qc.block_hash.0);
351    buf.extend_from_slice(&qc.view.as_u64().to_le_bytes());
352    buf
353}