1use ruc::*;
2
3use crate::application::Application;
4use crate::commit::try_commit;
5use crate::leader;
6use crate::network::NetworkSink;
7use crate::state::{ConsensusState, ViewRole, ViewStep};
8use crate::store::BlockStore;
9use hotmint_crypto::hash::hash_block;
10use hotmint_types::context::BlockContext;
11use hotmint_types::epoch::Epoch;
12use hotmint_types::vote::VoteType;
13use hotmint_types::*;
14use tracing::{debug, info, warn};
15
16pub enum ViewEntryTrigger {
18 DoubleCert(DoubleCertificate),
19 TimeoutCert(TimeoutCertificate),
20 Genesis,
21}
22
23pub fn enter_view(
25 state: &mut ConsensusState,
26 view: ViewNumber,
27 trigger: ViewEntryTrigger,
28 network: &dyn NetworkSink,
29 signer: &dyn Signer,
30) {
31 state.current_view = view;
32 state.step = ViewStep::Entered;
33
34 let am_leader = leader::is_leader(&state.validator_set, view, state.validator_id);
35 state.role = if am_leader {
36 ViewRole::Leader
37 } else {
38 ViewRole::Replica
39 };
40
41 info!(
42 validator = %state.validator_id,
43 view = %view,
44 role = ?state.role,
45 epoch = %state.current_epoch.number,
46 "entering view"
47 );
48
49 match trigger {
50 ViewEntryTrigger::Genesis => {
51 if am_leader {
52 state.step = ViewStep::WaitingForStatus;
54 } else {
55 state.step = ViewStep::WaitingForProposal;
56 }
57 }
58 ViewEntryTrigger::DoubleCert(dc) => {
59 state.update_highest_qc(&dc.outer_qc);
60 state.highest_double_cert = Some(dc);
61 if am_leader {
62 state.step = ViewStep::WaitingForStatus;
63 } else {
64 let leader_id = state.validator_set.leader_for_view(view).id;
66 let msg_bytes = status_signing_bytes(view, &state.locked_qc);
67 let sig = signer.sign(&msg_bytes);
68 network.send_to(
69 leader_id,
70 ConsensusMessage::StatusCert {
71 locked_qc: state.locked_qc.clone(),
72 validator: state.validator_id,
73 signature: sig,
74 },
75 );
76 state.step = ViewStep::WaitingForProposal;
77 }
78 }
79 ViewEntryTrigger::TimeoutCert(tc) => {
80 if let Some(hqc) = tc.highest_qc() {
81 state.update_highest_qc(hqc);
82 }
83 if am_leader {
84 state.step = ViewStep::WaitingForStatus;
85 } else {
86 let leader_id = state.validator_set.leader_for_view(view).id;
87 let msg_bytes = status_signing_bytes(view, &state.locked_qc);
88 let sig = signer.sign(&msg_bytes);
89 network.send_to(
90 leader_id,
91 ConsensusMessage::StatusCert {
92 locked_qc: state.locked_qc.clone(),
93 validator: state.validator_id,
94 signature: sig,
95 },
96 );
97 state.step = ViewStep::WaitingForProposal;
98 }
99 }
100 }
101}
102
103pub fn propose(
105 state: &mut ConsensusState,
106 store: &mut dyn BlockStore,
107 network: &dyn NetworkSink,
108 app: &dyn Application,
109 signer: &dyn Signer,
110) -> Result<Block> {
111 let justify = state
112 .highest_qc
113 .clone()
114 .c(d!("no QC to justify proposal"))?;
115
116 let parent_hash = justify.block_hash;
117 let parent = store
118 .get_block(&parent_hash)
119 .c(d!("parent block not found"))?;
120 let height = parent.height.next();
121
122 let ctx = BlockContext {
123 height,
124 view: state.current_view,
125 proposer: state.validator_id,
126 epoch: state.current_epoch.number,
127 epoch_start_view: state.current_epoch.start_view,
128 validator_set: &state.validator_set,
129 };
130
131 let payload = app.create_payload(&ctx);
132
133 let mut block = Block {
134 height,
135 parent_hash,
136 view: state.current_view,
137 proposer: state.validator_id,
138 payload,
139 hash: BlockHash::GENESIS, };
141 block.hash = hash_block(&block);
142
143 store.put_block(block.clone());
144
145 let msg_bytes = proposal_signing_bytes(&block, &justify);
146 let signature = signer.sign(&msg_bytes);
147
148 info!(
149 validator = %state.validator_id,
150 view = %state.current_view,
151 height = height.as_u64(),
152 hash = %block.hash,
153 "proposing block"
154 );
155
156 network.broadcast(ConsensusMessage::Propose {
157 block: Box::new(block.clone()),
158 justify: Box::new(justify),
159 double_cert: state.highest_double_cert.clone().map(Box::new),
160 signature,
161 });
162
163 state.step = ViewStep::CollectingVotes;
164 Ok(block)
165}
166
167pub struct ProposalData {
169 pub block: Block,
170 pub justify: QuorumCertificate,
171 pub double_cert: Option<DoubleCertificate>,
172}
173
174pub fn on_proposal(
177 state: &mut ConsensusState,
178 proposal: ProposalData,
179 store: &mut dyn BlockStore,
180 network: &dyn NetworkSink,
181 app: &dyn Application,
182 signer: &dyn Signer,
183) -> Result<Option<Epoch>> {
184 let ProposalData {
185 block,
186 justify,
187 double_cert,
188 } = proposal;
189 if state.step != ViewStep::WaitingForProposal {
190 debug!(
191 validator = %state.validator_id,
192 step = ?state.step,
193 "ignoring proposal, not waiting"
194 );
195 return Ok(None);
196 }
197
198 if let Some(ref locked) = state.locked_qc
200 && justify.rank() < locked.rank()
201 {
202 warn!(
203 validator = %state.validator_id,
204 justify_view = %justify.view,
205 locked_view = %locked.view,
206 "rejecting proposal: justify rank < locked rank"
207 );
208 return Err(eg!("proposal justify rank below locked QC rank"));
209 }
210
211 let expected_leader = state.validator_set.leader_for_view(block.view).id;
213 if block.proposer != expected_leader {
214 return Err(eg!(
215 "block proposer {} is not leader {} for view {}",
216 block.proposer,
217 expected_leader,
218 block.view
219 ));
220 }
221
222 let expected_hash = hotmint_crypto::hash_block(&block);
224 if block.hash != expected_hash {
225 return Err(eg!(
226 "block hash mismatch: declared {} != computed {}",
227 block.hash,
228 expected_hash
229 ));
230 }
231
232 let ctx = BlockContext {
233 height: block.height,
234 view: block.view,
235 proposer: block.proposer,
236 epoch: state.current_epoch.number,
237 epoch_start_view: state.current_epoch.start_view,
238 validator_set: &state.validator_set,
239 };
240
241 if !app.validate_block(&block, &ctx) {
242 return Err(eg!("application rejected block"));
243 }
244
245 store.put_block(block.clone());
247
248 state.update_highest_qc(&justify);
250
251 let mut pending_epoch = None;
253 if let Some(ref dc) = double_cert {
254 match try_commit(
255 dc,
256 store,
257 app,
258 &mut state.last_committed_height,
259 &state.current_epoch,
260 ) {
261 Ok(result) => {
262 pending_epoch = result.pending_epoch;
263 }
264 Err(e) => {
265 warn!(error = %e, "try_commit failed during fast-forward in on_proposal");
266 }
267 }
268 }
269
270 let vote_bytes = Vote::signing_bytes(state.current_view, &block.hash, VoteType::Vote);
272 let signature = signer.sign(&vote_bytes);
273 let vote = Vote {
274 block_hash: block.hash,
275 view: state.current_view,
276 validator: state.validator_id,
277 signature,
278 vote_type: VoteType::Vote,
279 };
280
281 let leader_id = state.validator_set.leader_for_view(state.current_view).id;
282 info!(
283 validator = %state.validator_id,
284 view = %state.current_view,
285 hash = %block.hash,
286 "voting for block"
287 );
288 network.send_to(leader_id, ConsensusMessage::VoteMsg(vote));
289
290 state.step = ViewStep::Voted;
291 Ok(pending_epoch)
292}
293
294pub fn on_votes_collected(
296 state: &mut ConsensusState,
297 qc: QuorumCertificate,
298 network: &dyn NetworkSink,
299 signer: &dyn Signer,
300) {
301 info!(
302 validator = %state.validator_id,
303 view = %state.current_view,
304 hash = %qc.block_hash,
305 "QC formed, broadcasting prepare"
306 );
307
308 state.update_highest_qc(&qc);
309
310 let msg_bytes = prepare_signing_bytes(&qc);
311 let signature = signer.sign(&msg_bytes);
312
313 network.broadcast(ConsensusMessage::Prepare {
314 certificate: qc,
315 signature,
316 });
317
318 state.step = ViewStep::Prepared;
319}
320
321pub fn on_prepare(
323 state: &mut ConsensusState,
324 qc: QuorumCertificate,
325 network: &dyn NetworkSink,
326 signer: &dyn Signer,
327) {
328 state.update_locked_qc(&qc);
330 state.update_highest_qc(&qc);
331
332 let vote_bytes = Vote::signing_bytes(state.current_view, &qc.block_hash, VoteType::Vote2);
334 let signature = signer.sign(&vote_bytes);
335 let vote = Vote {
336 block_hash: qc.block_hash,
337 view: state.current_view,
338 validator: state.validator_id,
339 signature,
340 vote_type: VoteType::Vote2,
341 };
342
343 let next_leader_id = leader::next_leader(&state.validator_set, state.current_view);
344 info!(
345 validator = %state.validator_id,
346 view = %state.current_view,
347 hash = %qc.block_hash,
348 "sending vote2 to next leader {}",
349 next_leader_id
350 );
351 network.send_to(next_leader_id, ConsensusMessage::Vote2Msg(vote));
352
353 state.step = ViewStep::SentVote2;
354}
355
356pub(crate) fn status_signing_bytes(
359 view: ViewNumber,
360 locked_qc: &Option<QuorumCertificate>,
361) -> Vec<u8> {
362 let mut buf = Vec::new();
363 buf.push(b'S');
364 buf.extend_from_slice(&view.as_u64().to_le_bytes());
365 if let Some(qc) = locked_qc {
366 buf.extend_from_slice(&qc.block_hash.0);
367 buf.extend_from_slice(&qc.view.as_u64().to_le_bytes());
368 }
369 buf
370}
371
372pub(crate) fn proposal_signing_bytes(block: &Block, justify: &QuorumCertificate) -> Vec<u8> {
373 let mut buf = Vec::new();
374 buf.push(b'P');
375 buf.extend_from_slice(&block.hash.0);
376 buf.extend_from_slice(&justify.block_hash.0);
377 buf.extend_from_slice(&justify.view.as_u64().to_le_bytes());
378 buf
379}
380
381pub(crate) fn prepare_signing_bytes(qc: &QuorumCertificate) -> Vec<u8> {
382 let mut buf = Vec::new();
383 buf.push(b'Q');
384 buf.extend_from_slice(&qc.block_hash.0);
385 buf.extend_from_slice(&qc.view.as_u64().to_le_bytes());
386 buf
387}