1use ruc::*;
2
3use crate::application::Application;
4use crate::commit::try_commit;
5use crate::leader;
6use crate::network::NetworkSink;
7use crate::state::{ConsensusState, ViewRole, ViewStep};
8use crate::store::BlockStore;
9use hotmint_crypto::hash::hash_block;
10use hotmint_types::context::BlockContext;
11use hotmint_types::epoch::Epoch;
12use hotmint_types::vote::VoteType;
13use hotmint_types::*;
14use tracing::{debug, info, warn};
15
16pub enum ViewEntryTrigger {
18 DoubleCert(DoubleCertificate),
19 TimeoutCert(TimeoutCertificate),
20 Genesis,
21}
22
23pub fn enter_view(
25 state: &mut ConsensusState,
26 view: ViewNumber,
27 trigger: ViewEntryTrigger,
28 network: &dyn NetworkSink,
29 signer: &dyn Signer,
30) {
31 state.current_view = view;
32 state.step = ViewStep::Entered;
33
34 let am_leader = leader::is_leader(&state.validator_set, view, state.validator_id);
35 state.role = if am_leader {
36 ViewRole::Leader
37 } else {
38 ViewRole::Replica
39 };
40
41 info!(
42 validator = %state.validator_id,
43 view = %view,
44 role = ?state.role,
45 epoch = %state.current_epoch.number,
46 "entering view"
47 );
48
49 match trigger {
50 ViewEntryTrigger::Genesis => {
51 if am_leader {
52 state.step = ViewStep::WaitingForStatus;
53 state.step = ViewStep::Proposed; } else {
56 state.step = ViewStep::WaitingForProposal;
57 }
58 }
59 ViewEntryTrigger::DoubleCert(dc) => {
60 state.update_highest_qc(&dc.outer_qc);
61 state.highest_double_cert = Some(dc);
62 if am_leader {
63 state.step = ViewStep::WaitingForStatus;
64 } else {
65 let leader_id = state.validator_set.leader_for_view(view).id;
67 let msg_bytes = status_signing_bytes(view, &state.locked_qc);
68 let sig = signer.sign(&msg_bytes);
69 network.send_to(
70 leader_id,
71 ConsensusMessage::StatusCert {
72 locked_qc: state.locked_qc.clone(),
73 validator: state.validator_id,
74 signature: sig,
75 },
76 );
77 state.step = ViewStep::WaitingForProposal;
78 }
79 }
80 ViewEntryTrigger::TimeoutCert(tc) => {
81 if let Some(hqc) = tc.highest_qc() {
82 state.update_highest_qc(hqc);
83 }
84 if am_leader {
85 state.step = ViewStep::WaitingForStatus;
86 } else {
87 let leader_id = state.validator_set.leader_for_view(view).id;
88 let msg_bytes = status_signing_bytes(view, &state.locked_qc);
89 let sig = signer.sign(&msg_bytes);
90 network.send_to(
91 leader_id,
92 ConsensusMessage::StatusCert {
93 locked_qc: state.locked_qc.clone(),
94 validator: state.validator_id,
95 signature: sig,
96 },
97 );
98 state.step = ViewStep::WaitingForProposal;
99 }
100 }
101 }
102}
103
104pub fn propose(
106 state: &mut ConsensusState,
107 store: &mut dyn BlockStore,
108 network: &dyn NetworkSink,
109 app: &dyn Application,
110 signer: &dyn Signer,
111) -> Result<Block> {
112 let justify = state
113 .highest_qc
114 .clone()
115 .c(d!("no QC to justify proposal"))?;
116
117 let parent_hash = justify.block_hash;
118 let parent = store
119 .get_block(&parent_hash)
120 .c(d!("parent block not found"))?;
121 let height = parent.height.next();
122
123 let ctx = BlockContext {
124 height,
125 view: state.current_view,
126 proposer: state.validator_id,
127 epoch: state.current_epoch.number,
128 validator_set: &state.validator_set,
129 };
130
131 let payload = app.create_payload(&ctx);
132
133 let mut block = Block {
134 height,
135 parent_hash,
136 view: state.current_view,
137 proposer: state.validator_id,
138 payload,
139 hash: BlockHash::GENESIS, };
141 block.hash = hash_block(&block);
142
143 store.put_block(block.clone());
144
145 let msg_bytes = proposal_signing_bytes(&block, &justify);
146 let signature = signer.sign(&msg_bytes);
147
148 info!(
149 validator = %state.validator_id,
150 view = %state.current_view,
151 height = height.as_u64(),
152 hash = %block.hash,
153 "proposing block"
154 );
155
156 network.broadcast(ConsensusMessage::Propose {
157 block: Box::new(block.clone()),
158 justify: Box::new(justify),
159 double_cert: state.highest_double_cert.clone().map(Box::new),
160 signature,
161 });
162
163 state.step = ViewStep::CollectingVotes;
164 Ok(block)
165}
166
167#[allow(clippy::too_many_arguments)]
170pub fn on_proposal(
171 state: &mut ConsensusState,
172 block: Block,
173 justify: QuorumCertificate,
174 double_cert: Option<DoubleCertificate>,
175 store: &mut dyn BlockStore,
176 network: &dyn NetworkSink,
177 app: &dyn Application,
178 signer: &dyn Signer,
179) -> Result<Option<Epoch>> {
180 if state.step != ViewStep::WaitingForProposal {
181 debug!(
182 validator = %state.validator_id,
183 step = ?state.step,
184 "ignoring proposal, not waiting"
185 );
186 return Ok(None);
187 }
188
189 if let Some(ref locked) = state.locked_qc
191 && justify.rank() < locked.rank()
192 {
193 warn!(
194 validator = %state.validator_id,
195 justify_view = %justify.view,
196 locked_view = %locked.view,
197 "rejecting proposal: justify rank < locked rank"
198 );
199 return Err(eg!("proposal justify rank below locked QC rank"));
200 }
201
202 let ctx = BlockContext {
203 height: block.height,
204 view: block.view,
205 proposer: block.proposer,
206 epoch: state.current_epoch.number,
207 validator_set: &state.validator_set,
208 };
209
210 if !app.validate_block(&block, &ctx) {
211 return Err(eg!("application rejected block"));
212 }
213
214 store.put_block(block.clone());
216
217 state.update_highest_qc(&justify);
219
220 let mut pending_epoch = None;
222 if let Some(ref dc) = double_cert {
223 match try_commit(
224 dc,
225 store,
226 app,
227 &mut state.last_committed_height,
228 &state.current_epoch,
229 ) {
230 Ok(result) => {
231 pending_epoch = result.pending_epoch;
232 }
233 Err(e) => {
234 warn!(error = %e, "try_commit failed during fast-forward in on_proposal");
235 }
236 }
237 }
238
239 let vote_bytes = Vote::signing_bytes(state.current_view, &block.hash, VoteType::Vote);
241 let signature = signer.sign(&vote_bytes);
242 let vote = Vote {
243 block_hash: block.hash,
244 view: state.current_view,
245 validator: state.validator_id,
246 signature,
247 vote_type: VoteType::Vote,
248 };
249
250 let leader_id = state.validator_set.leader_for_view(state.current_view).id;
251 info!(
252 validator = %state.validator_id,
253 view = %state.current_view,
254 hash = %block.hash,
255 "voting for block"
256 );
257 network.send_to(leader_id, ConsensusMessage::VoteMsg(vote));
258
259 state.step = ViewStep::Voted;
260 Ok(pending_epoch)
261}
262
263pub fn on_votes_collected(
265 state: &mut ConsensusState,
266 qc: QuorumCertificate,
267 network: &dyn NetworkSink,
268 signer: &dyn Signer,
269) {
270 info!(
271 validator = %state.validator_id,
272 view = %state.current_view,
273 hash = %qc.block_hash,
274 "QC formed, broadcasting prepare"
275 );
276
277 state.update_highest_qc(&qc);
278
279 let msg_bytes = prepare_signing_bytes(&qc);
280 let signature = signer.sign(&msg_bytes);
281
282 network.broadcast(ConsensusMessage::Prepare {
283 certificate: qc,
284 signature,
285 });
286
287 state.step = ViewStep::Prepared;
288}
289
290pub fn on_prepare(
292 state: &mut ConsensusState,
293 qc: QuorumCertificate,
294 network: &dyn NetworkSink,
295 signer: &dyn Signer,
296) {
297 state.update_locked_qc(&qc);
299 state.update_highest_qc(&qc);
300
301 let vote_bytes = Vote::signing_bytes(state.current_view, &qc.block_hash, VoteType::Vote2);
303 let signature = signer.sign(&vote_bytes);
304 let vote = Vote {
305 block_hash: qc.block_hash,
306 view: state.current_view,
307 validator: state.validator_id,
308 signature,
309 vote_type: VoteType::Vote2,
310 };
311
312 let next_leader_id = leader::next_leader(&state.validator_set, state.current_view);
313 info!(
314 validator = %state.validator_id,
315 view = %state.current_view,
316 hash = %qc.block_hash,
317 "sending vote2 to next leader {}",
318 next_leader_id
319 );
320 network.send_to(next_leader_id, ConsensusMessage::Vote2Msg(vote));
321
322 state.step = ViewStep::SentVote2;
323}
324
325fn status_signing_bytes(view: ViewNumber, locked_qc: &Option<QuorumCertificate>) -> Vec<u8> {
328 let mut buf = Vec::new();
329 buf.push(b'S');
330 buf.extend_from_slice(&view.as_u64().to_le_bytes());
331 if let Some(qc) = locked_qc {
332 buf.extend_from_slice(&qc.block_hash.0);
333 buf.extend_from_slice(&qc.view.as_u64().to_le_bytes());
334 }
335 buf
336}
337
338fn proposal_signing_bytes(block: &Block, justify: &QuorumCertificate) -> Vec<u8> {
339 let mut buf = Vec::new();
340 buf.push(b'P');
341 buf.extend_from_slice(&block.hash.0);
342 buf.extend_from_slice(&justify.block_hash.0);
343 buf.extend_from_slice(&justify.view.as_u64().to_le_bytes());
344 buf
345}
346
347fn prepare_signing_bytes(qc: &QuorumCertificate) -> Vec<u8> {
348 let mut buf = Vec::new();
349 buf.push(b'Q');
350 buf.extend_from_slice(&qc.block_hash.0);
351 buf.extend_from_slice(&qc.view.as_u64().to_le_bytes());
352 buf
353}