1use ruc::*;
2
3use crate::application::Application;
4use crate::commit::try_commit;
5use crate::leader;
6use crate::network::NetworkSink;
7use crate::pacemaker::Pacemaker;
8use crate::state::{ConsensusState, ViewStep};
9use crate::store::BlockStore;
10use crate::view_protocol::{self, ViewEntryTrigger};
11use crate::vote_collector::VoteCollector;
12
13use hotmint_types::vote::VoteType;
14use hotmint_types::*;
15use tokio::sync::mpsc;
16use tracing::{debug, info, warn};
17
18pub struct ConsensusEngine {
19 state: ConsensusState,
20 store: Box<dyn BlockStore>,
21 network: Box<dyn NetworkSink>,
22 app: Box<dyn Application>,
23 signer: Box<dyn Signer>,
24 vote_collector: VoteCollector,
25 pacemaker: Pacemaker,
26 msg_rx: mpsc::UnboundedReceiver<(ValidatorId, ConsensusMessage)>,
27 status_count: usize,
29 current_view_qc: Option<QuorumCertificate>,
31}
32
33impl ConsensusEngine {
34 pub fn new(
35 state: ConsensusState,
36 store: Box<dyn BlockStore>,
37 network: Box<dyn NetworkSink>,
38 app: Box<dyn Application>,
39 signer: Box<dyn Signer>,
40 msg_rx: mpsc::UnboundedReceiver<(ValidatorId, ConsensusMessage)>,
41 ) -> Self {
42 Self {
43 state,
44 store,
45 network,
46 app,
47 signer,
48 vote_collector: VoteCollector::new(),
49 pacemaker: Pacemaker::new(),
50 msg_rx,
51 status_count: 0,
52 current_view_qc: None,
53 }
54 }
55
56 pub async fn run(mut self) {
58 self.enter_genesis_view();
59
60 loop {
61 let deadline = self.pacemaker.sleep_until_deadline();
62 tokio::pin!(deadline);
63
64 tokio::select! {
65 Some((sender, msg)) = self.msg_rx.recv() => {
66 if let Err(e) = self.handle_message(sender, msg) {
67 warn!(validator = %self.state.validator_id, error = %e, "error handling message");
68 }
69 }
70 _ = &mut deadline => {
71 self.handle_timeout();
72 }
73 }
74 }
75 }
76
77 fn enter_genesis_view(&mut self) {
78 let genesis_qc = QuorumCertificate {
80 block_hash: BlockHash::GENESIS,
81 view: ViewNumber::GENESIS,
82 aggregate_signature: AggregateSignature::new(
83 self.state.validator_set.validator_count(),
84 ),
85 };
86 self.state.highest_qc = Some(genesis_qc);
87
88 let view = ViewNumber(1);
89 view_protocol::enter_view(
90 &mut self.state,
91 view,
92 ViewEntryTrigger::Genesis,
93 self.network.as_ref(),
94 self.signer.as_ref(),
95 );
96 self.pacemaker.reset_timer();
97
98 if self.state.is_leader() {
100 self.state.step = ViewStep::WaitingForStatus;
101 self.try_propose();
103 }
104 }
105
106 fn try_propose(&mut self) {
107 match view_protocol::propose(
108 &mut self.state,
109 self.store.as_mut(),
110 self.network.as_ref(),
111 self.app.as_ref(),
112 self.signer.as_ref(),
113 ) {
114 Ok(block) => {
115 self.leader_self_vote(block.hash);
117 }
118 Err(e) => {
119 warn!(
120 validator = %self.state.validator_id,
121 error = %e,
122 "failed to propose"
123 );
124 }
125 }
126 }
127
128 fn leader_self_vote(&mut self, block_hash: BlockHash) {
129 let vote_bytes = Vote::signing_bytes(self.state.current_view, &block_hash, VoteType::Vote);
130 let signature = self.signer.sign(&vote_bytes);
131 let vote = Vote {
132 block_hash,
133 view: self.state.current_view,
134 validator: self.state.validator_id,
135 signature,
136 vote_type: VoteType::Vote,
137 };
138 if let Ok(Some(formed_qc)) = self
139 .vote_collector
140 .add_vote(&self.state.validator_set, vote)
141 {
142 self.on_qc_formed(formed_qc);
143 }
144 }
145
146 fn handle_message(&mut self, _sender: ValidatorId, msg: ConsensusMessage) -> Result<()> {
147 match msg {
148 ConsensusMessage::Propose {
149 block,
150 justify,
151 double_cert,
152 signature: _,
153 } => {
154 let block = *block;
155 let justify = *justify;
156 let double_cert = double_cert.map(|dc| *dc);
157
158 if block.view > self.state.current_view {
160 if let Some(ref dc) = double_cert {
161 if let Err(e) = try_commit(
163 dc,
164 self.store.as_ref(),
165 self.app.as_ref(),
166 &mut self.state.last_committed_height,
167 ) {
168 warn!(error = %e, "try_commit failed during fast-forward");
169 }
170 self.state.highest_double_cert = Some(dc.clone());
171 self.advance_view_to(block.view, ViewEntryTrigger::DoubleCert(dc.clone()));
172 } else {
173 debug!(
174 validator = %self.state.validator_id,
175 block_view = %block.view,
176 current_view = %self.state.current_view,
177 "ignoring proposal from future view without double cert"
178 );
179 return Ok(());
180 }
181 } else if block.view < self.state.current_view {
182 return Ok(());
183 }
184
185 view_protocol::on_proposal(
186 &mut self.state,
187 block,
188 justify,
189 double_cert,
190 self.store.as_mut(),
191 self.network.as_ref(),
192 self.app.as_ref(),
193 self.signer.as_ref(),
194 )
195 .c(d!())?;
196 }
197
198 ConsensusMessage::VoteMsg(vote) => {
199 if vote.view != self.state.current_view {
200 return Ok(());
201 }
202 if !self.state.is_leader() {
203 return Ok(());
204 }
205 if vote.vote_type != VoteType::Vote {
206 return Ok(());
207 }
208
209 if let Some(qc) = self
210 .vote_collector
211 .add_vote(&self.state.validator_set, vote)
212 .c(d!())?
213 {
214 self.on_qc_formed(qc);
215 }
216 }
217
218 ConsensusMessage::Prepare {
219 certificate,
220 signature: _,
221 } => {
222 if certificate.view < self.state.current_view {
223 return Ok(());
224 }
225 if certificate.view == self.state.current_view {
227 view_protocol::on_prepare(
228 &mut self.state,
229 certificate,
230 self.network.as_ref(),
231 self.signer.as_ref(),
232 );
233 }
234 }
237
238 ConsensusMessage::Vote2Msg(vote) => {
239 if vote.vote_type != VoteType::Vote2 {
240 return Ok(());
241 }
242 if let Some(outer_qc) = self
245 .vote_collector
246 .add_vote(&self.state.validator_set, vote)
247 .c(d!())?
248 {
249 self.on_double_cert_formed(outer_qc);
250 }
251 }
252
253 ConsensusMessage::Wish {
254 target_view,
255 validator,
256 highest_qc,
257 signature,
258 } => {
259 if let Some(tc) = self.pacemaker.add_wish(
260 &self.state.validator_set,
261 target_view,
262 validator,
263 highest_qc,
264 signature,
265 ) {
266 info!(
267 validator = %self.state.validator_id,
268 view = %tc.view,
269 "TC formed, advancing view"
270 );
271 self.network
272 .broadcast(ConsensusMessage::TimeoutCert(tc.clone()));
273 self.advance_view(ViewEntryTrigger::TimeoutCert(tc));
274 }
275 }
276
277 ConsensusMessage::TimeoutCert(tc) => {
278 if self.pacemaker.should_relay_tc(&tc) {
280 self.network
281 .broadcast(ConsensusMessage::TimeoutCert(tc.clone()));
282 }
283 let new_view = ViewNumber(tc.view.as_u64() + 1);
284 if new_view > self.state.current_view {
285 self.advance_view(ViewEntryTrigger::TimeoutCert(tc));
286 }
287 }
288
289 ConsensusMessage::StatusCert {
290 locked_qc,
291 validator: _,
292 signature: _,
293 } => {
294 if self.state.is_leader() && self.state.step == ViewStep::WaitingForStatus {
295 if let Some(ref qc) = locked_qc {
296 self.state.update_highest_qc(qc);
297 }
298 self.status_count += 1;
299 let needed = self.state.validator_set.quorum_threshold() as usize - 1;
300 if self.status_count >= needed {
301 self.try_propose();
302 }
303 }
304 }
305 }
306 Ok(())
307 }
308
309 fn on_qc_formed(&mut self, qc: QuorumCertificate) {
310 self.current_view_qc = Some(qc.clone());
312
313 view_protocol::on_votes_collected(
314 &mut self.state,
315 qc.clone(),
316 self.network.as_ref(),
317 self.signer.as_ref(),
318 );
319
320 let vote_bytes =
322 Vote::signing_bytes(self.state.current_view, &qc.block_hash, VoteType::Vote2);
323 let signature = self.signer.sign(&vote_bytes);
324 let vote = Vote {
325 block_hash: qc.block_hash,
326 view: self.state.current_view,
327 validator: self.state.validator_id,
328 signature,
329 vote_type: VoteType::Vote2,
330 };
331
332 self.state.update_locked_qc(&qc);
334
335 let next_leader_id =
336 leader::next_leader(&self.state.validator_set, self.state.current_view);
337 if next_leader_id == self.state.validator_id {
338 if let Ok(Some(outer_qc)) = self
340 .vote_collector
341 .add_vote(&self.state.validator_set, vote)
342 {
343 self.on_double_cert_formed(outer_qc);
344 }
345 } else {
346 self.network
347 .send_to(next_leader_id, ConsensusMessage::Vote2Msg(vote));
348 }
349 }
350
351 fn on_double_cert_formed(&mut self, outer_qc: QuorumCertificate) {
352 let inner_qc = match self.current_view_qc.take() {
354 Some(qc) if qc.block_hash == outer_qc.block_hash => qc,
355 _ => {
356 match &self.state.locked_qc {
358 Some(qc) if qc.block_hash == outer_qc.block_hash => qc.clone(),
359 _ => match &self.state.highest_qc {
360 Some(qc) if qc.block_hash == outer_qc.block_hash => qc.clone(),
361 _ => {
362 warn!(
363 validator = %self.state.validator_id,
364 "double cert formed but can't find matching inner QC"
365 );
366 return;
367 }
368 },
369 }
370 }
371 };
372
373 let dc = DoubleCertificate { inner_qc, outer_qc };
374
375 info!(
376 validator = %self.state.validator_id,
377 view = %self.state.current_view,
378 hash = %dc.inner_qc.block_hash,
379 "double certificate formed, committing"
380 );
381
382 if let Err(e) = try_commit(
384 &dc,
385 self.store.as_ref(),
386 self.app.as_ref(),
387 &mut self.state.last_committed_height,
388 ) {
389 warn!(error = %e, "try_commit failed in double cert handler");
390 }
391
392 self.state.highest_double_cert = Some(dc.clone());
393
394 self.advance_view(ViewEntryTrigger::DoubleCert(dc));
397 }
398
399 fn handle_timeout(&mut self) {
400 info!(
401 validator = %self.state.validator_id,
402 view = %self.state.current_view,
403 "view timeout, sending wish"
404 );
405
406 let wish = self.pacemaker.build_wish(
407 self.state.current_view,
408 self.state.validator_id,
409 self.state.highest_qc.clone(),
410 self.signer.as_ref(),
411 );
412
413 self.network.broadcast(wish.clone());
414
415 if let ConsensusMessage::Wish {
417 target_view,
418 validator,
419 highest_qc,
420 signature,
421 } = wish
422 && let Some(tc) = self.pacemaker.add_wish(
423 &self.state.validator_set,
424 target_view,
425 validator,
426 highest_qc,
427 signature,
428 )
429 {
430 self.network
431 .broadcast(ConsensusMessage::TimeoutCert(tc.clone()));
432 self.advance_view(ViewEntryTrigger::TimeoutCert(tc));
433 return;
434 }
435
436 self.pacemaker.on_timeout();
438 }
439
440 fn advance_view(&mut self, trigger: ViewEntryTrigger) {
441 let new_view = match &trigger {
442 ViewEntryTrigger::DoubleCert(_) => self.state.current_view.next(),
443 ViewEntryTrigger::TimeoutCert(tc) => ViewNumber(tc.view.as_u64() + 1),
444 ViewEntryTrigger::Genesis => ViewNumber(1),
445 };
446 self.advance_view_to(new_view, trigger);
447 }
448
449 fn advance_view_to(&mut self, new_view: ViewNumber, trigger: ViewEntryTrigger) {
450 if new_view <= self.state.current_view {
451 return;
452 }
453
454 let is_progress = matches!(&trigger, ViewEntryTrigger::DoubleCert(_));
456
457 self.vote_collector.clear_view(self.state.current_view);
458 self.pacemaker.clear_view(self.state.current_view);
459 self.status_count = 0;
460 self.current_view_qc = None;
461
462 view_protocol::enter_view(
463 &mut self.state,
464 new_view,
465 trigger,
466 self.network.as_ref(),
467 self.signer.as_ref(),
468 );
469
470 if is_progress {
471 self.pacemaker.reset_on_progress();
472 } else {
473 self.pacemaker.reset_timer();
474 }
475
476 if self.state.is_leader() && self.state.step == ViewStep::WaitingForStatus {
478 self.try_propose();
480 }
481 }
482}