1use ruc::*;
2
3use std::sync::{Arc, RwLock};
4
5use crate::application::Application;
6use crate::commit::try_commit;
7use crate::leader;
8use crate::network::NetworkSink;
9use crate::pacemaker::Pacemaker;
10use crate::state::{ConsensusState, ViewStep};
11use crate::store::BlockStore;
12use crate::view_protocol::{self, ViewEntryTrigger};
13use crate::vote_collector::VoteCollector;
14
15use hotmint_types::epoch::Epoch;
16use hotmint_types::vote::VoteType;
17use hotmint_types::*;
18use tokio::sync::mpsc;
19use tracing::{info, warn};
20
21pub type SharedBlockStore = Arc<RwLock<Box<dyn BlockStore>>>;
23
24pub struct ConsensusEngine {
25 state: ConsensusState,
26 store: SharedBlockStore,
27 network: Box<dyn NetworkSink>,
28 app: Box<dyn Application>,
29 signer: Box<dyn Signer>,
30 vote_collector: VoteCollector,
31 pacemaker: Pacemaker,
32 msg_rx: mpsc::UnboundedReceiver<(ValidatorId, ConsensusMessage)>,
33 status_count: usize,
35 current_view_qc: Option<QuorumCertificate>,
37 pending_epoch: Option<Epoch>,
39}
40
41impl ConsensusEngine {
42 pub fn new(
43 state: ConsensusState,
44 store: SharedBlockStore,
45 network: Box<dyn NetworkSink>,
46 app: Box<dyn Application>,
47 signer: Box<dyn Signer>,
48 msg_rx: mpsc::UnboundedReceiver<(ValidatorId, ConsensusMessage)>,
49 ) -> Self {
50 Self {
51 state,
52 store,
53 network,
54 app,
55 signer,
56 vote_collector: VoteCollector::new(),
57 pacemaker: Pacemaker::new(),
58 msg_rx,
59 status_count: 0,
60 current_view_qc: None,
61 pending_epoch: None,
62 }
63 }
64
65 pub async fn run(mut self) {
67 self.enter_genesis_view();
68
69 loop {
70 let deadline = self.pacemaker.sleep_until_deadline();
71 tokio::pin!(deadline);
72
73 tokio::select! {
74 Some((sender, msg)) = self.msg_rx.recv() => {
75 if let Err(e) = self.handle_message(sender, msg) {
76 warn!(validator = %self.state.validator_id, error = %e, "error handling message");
77 }
78 }
79 _ = &mut deadline => {
80 self.handle_timeout();
81 }
82 }
83 }
84 }
85
86 fn enter_genesis_view(&mut self) {
87 let genesis_qc = QuorumCertificate {
89 block_hash: BlockHash::GENESIS,
90 view: ViewNumber::GENESIS,
91 aggregate_signature: AggregateSignature::new(
92 self.state.validator_set.validator_count(),
93 ),
94 };
95 self.state.highest_qc = Some(genesis_qc);
96
97 let view = ViewNumber(1);
98 view_protocol::enter_view(
99 &mut self.state,
100 view,
101 ViewEntryTrigger::Genesis,
102 self.network.as_ref(),
103 self.signer.as_ref(),
104 );
105 self.pacemaker.reset_timer();
106
107 if self.state.is_leader() {
109 self.state.step = ViewStep::WaitingForStatus;
110 self.try_propose();
112 }
113 }
114
115 fn try_propose(&mut self) {
116 let mut store = self.store.write().unwrap();
117 match view_protocol::propose(
118 &mut self.state,
119 store.as_mut(),
120 self.network.as_ref(),
121 self.app.as_ref(),
122 self.signer.as_ref(),
123 ) {
124 Ok(block) => {
125 drop(store);
126 self.leader_self_vote(block.hash);
128 }
129 Err(e) => {
130 warn!(
131 validator = %self.state.validator_id,
132 error = %e,
133 "failed to propose"
134 );
135 }
136 }
137 }
138
139 fn leader_self_vote(&mut self, block_hash: BlockHash) {
140 let vote_bytes = Vote::signing_bytes(self.state.current_view, &block_hash, VoteType::Vote);
141 let signature = self.signer.sign(&vote_bytes);
142 let vote = Vote {
143 block_hash,
144 view: self.state.current_view,
145 validator: self.state.validator_id,
146 signature,
147 vote_type: VoteType::Vote,
148 };
149 match self
150 .vote_collector
151 .add_vote(&self.state.validator_set, vote)
152 {
153 Ok(result) => {
154 self.handle_equivocation(&result);
155 if let Some(qc) = result.qc {
156 self.on_qc_formed(qc);
157 }
158 }
159 Err(e) => warn!(error = %e, "failed to add self vote"),
160 }
161 }
162
163 fn handle_message(&mut self, _sender: ValidatorId, msg: ConsensusMessage) -> Result<()> {
164 match msg {
165 ConsensusMessage::Propose {
166 block,
167 justify,
168 double_cert,
169 signature: _,
170 } => {
171 let block = *block;
172 let justify = *justify;
173 let double_cert = double_cert.map(|dc| *dc);
174
175 if block.view > self.state.current_view {
177 if let Some(ref dc) = double_cert {
178 let store = self.store.read().unwrap();
180 match try_commit(
181 dc,
182 store.as_ref(),
183 self.app.as_ref(),
184 &mut self.state.last_committed_height,
185 &self.state.current_epoch,
186 ) {
187 Ok(result) => {
188 if result.pending_epoch.is_some() {
189 self.pending_epoch = result.pending_epoch;
190 }
191 }
192 Err(e) => {
193 warn!(error = %e, "try_commit failed during fast-forward");
194 }
195 }
196 drop(store);
197 self.state.highest_double_cert = Some(dc.clone());
198 self.advance_view_to(block.view, ViewEntryTrigger::DoubleCert(dc.clone()));
199 } else {
200 return Ok(());
201 }
202 } else if block.view < self.state.current_view {
203 if block.height > self.state.last_committed_height {
209 let mut store = self.store.write().unwrap();
210 store.put_block(block);
211 }
212 return Ok(());
213 }
214
215 let mut store = self.store.write().unwrap();
216 let maybe_pending = view_protocol::on_proposal(
217 &mut self.state,
218 block,
219 justify,
220 double_cert,
221 store.as_mut(),
222 self.network.as_ref(),
223 self.app.as_ref(),
224 self.signer.as_ref(),
225 )
226 .c(d!())?;
227 drop(store);
228
229 if let Some(epoch) = maybe_pending {
230 self.pending_epoch = Some(epoch);
231 }
232 }
233
234 ConsensusMessage::VoteMsg(vote) => {
235 if vote.view != self.state.current_view {
236 return Ok(());
237 }
238 if !self.state.is_leader() {
239 return Ok(());
240 }
241 if vote.vote_type != VoteType::Vote {
242 return Ok(());
243 }
244
245 let result = self
246 .vote_collector
247 .add_vote(&self.state.validator_set, vote)
248 .c(d!())?;
249 self.handle_equivocation(&result);
250 if let Some(qc) = result.qc {
251 self.on_qc_formed(qc);
252 }
253 }
254
255 ConsensusMessage::Prepare {
256 certificate,
257 signature: _,
258 } => {
259 if certificate.view < self.state.current_view {
260 return Ok(());
261 }
262 if certificate.view == self.state.current_view {
263 view_protocol::on_prepare(
264 &mut self.state,
265 certificate,
266 self.network.as_ref(),
267 self.signer.as_ref(),
268 );
269 }
270 }
271
272 ConsensusMessage::Vote2Msg(vote) => {
273 if vote.vote_type != VoteType::Vote2 {
274 return Ok(());
275 }
276 let result = self
277 .vote_collector
278 .add_vote(&self.state.validator_set, vote)
279 .c(d!())?;
280 self.handle_equivocation(&result);
281 if let Some(outer_qc) = result.qc {
282 self.on_double_cert_formed(outer_qc);
283 }
284 }
285
286 ConsensusMessage::Wish {
287 target_view,
288 validator,
289 highest_qc,
290 signature,
291 } => {
292 if let Some(tc) = self.pacemaker.add_wish(
293 &self.state.validator_set,
294 target_view,
295 validator,
296 highest_qc,
297 signature,
298 ) {
299 info!(
300 validator = %self.state.validator_id,
301 view = %tc.view,
302 "TC formed, advancing view"
303 );
304 self.network
305 .broadcast(ConsensusMessage::TimeoutCert(tc.clone()));
306 self.advance_view(ViewEntryTrigger::TimeoutCert(tc));
307 }
308 }
309
310 ConsensusMessage::TimeoutCert(tc) => {
311 if self.pacemaker.should_relay_tc(&tc) {
312 self.network
313 .broadcast(ConsensusMessage::TimeoutCert(tc.clone()));
314 }
315 let new_view = ViewNumber(tc.view.as_u64() + 1);
316 if new_view > self.state.current_view {
317 self.advance_view(ViewEntryTrigger::TimeoutCert(tc));
318 }
319 }
320
321 ConsensusMessage::StatusCert {
322 locked_qc,
323 validator: _,
324 signature: _,
325 } => {
326 if self.state.is_leader() && self.state.step == ViewStep::WaitingForStatus {
327 if let Some(ref qc) = locked_qc {
328 self.state.update_highest_qc(qc);
329 }
330 self.status_count += 1;
331 let needed = self.state.validator_set.quorum_threshold() as usize - 1;
332 if self.status_count >= needed {
333 self.try_propose();
334 }
335 }
336 }
337 }
338 Ok(())
339 }
340
341 fn handle_equivocation(&self, result: &crate::vote_collector::VoteResult) {
342 if let Some(ref proof) = result.equivocation {
343 warn!(
344 validator = %proof.validator,
345 view = %proof.view,
346 "equivocation detected!"
347 );
348 if let Err(e) = self.app.on_evidence(proof) {
349 warn!(error = %e, "on_evidence callback failed");
350 }
351 }
352 }
353
354 fn on_qc_formed(&mut self, qc: QuorumCertificate) {
355 self.current_view_qc = Some(qc.clone());
357
358 view_protocol::on_votes_collected(
359 &mut self.state,
360 qc.clone(),
361 self.network.as_ref(),
362 self.signer.as_ref(),
363 );
364
365 let vote_bytes =
367 Vote::signing_bytes(self.state.current_view, &qc.block_hash, VoteType::Vote2);
368 let signature = self.signer.sign(&vote_bytes);
369 let vote = Vote {
370 block_hash: qc.block_hash,
371 view: self.state.current_view,
372 validator: self.state.validator_id,
373 signature,
374 vote_type: VoteType::Vote2,
375 };
376
377 self.state.update_locked_qc(&qc);
379
380 let next_leader_id =
381 leader::next_leader(&self.state.validator_set, self.state.current_view);
382 if next_leader_id == self.state.validator_id {
383 match self
385 .vote_collector
386 .add_vote(&self.state.validator_set, vote)
387 {
388 Ok(result) => {
389 self.handle_equivocation(&result);
390 if let Some(outer_qc) = result.qc {
391 self.on_double_cert_formed(outer_qc);
392 }
393 }
394 Err(e) => warn!(error = %e, "failed to add self vote2"),
395 }
396 } else {
397 self.network
398 .send_to(next_leader_id, ConsensusMessage::Vote2Msg(vote));
399 }
400 }
401
402 fn on_double_cert_formed(&mut self, outer_qc: QuorumCertificate) {
403 let inner_qc = match self.current_view_qc.take() {
405 Some(qc) if qc.block_hash == outer_qc.block_hash => qc,
406 _ => {
407 match &self.state.locked_qc {
409 Some(qc) if qc.block_hash == outer_qc.block_hash => qc.clone(),
410 _ => match &self.state.highest_qc {
411 Some(qc) if qc.block_hash == outer_qc.block_hash => qc.clone(),
412 _ => {
413 warn!(
414 validator = %self.state.validator_id,
415 "double cert formed but can't find matching inner QC"
416 );
417 return;
418 }
419 },
420 }
421 }
422 };
423
424 let dc = DoubleCertificate { inner_qc, outer_qc };
425
426 info!(
427 validator = %self.state.validator_id,
428 view = %self.state.current_view,
429 hash = %dc.inner_qc.block_hash,
430 "double certificate formed, committing"
431 );
432
433 {
435 let store = self.store.read().unwrap();
436 match try_commit(
437 &dc,
438 store.as_ref(),
439 self.app.as_ref(),
440 &mut self.state.last_committed_height,
441 &self.state.current_epoch,
442 ) {
443 Ok(result) => {
444 if result.pending_epoch.is_some() {
445 self.pending_epoch = result.pending_epoch;
446 }
447 }
448 Err(e) => {
449 warn!(error = %e, "try_commit failed in double cert handler");
450 }
451 }
452 }
453
454 self.state.highest_double_cert = Some(dc.clone());
455
456 self.advance_view(ViewEntryTrigger::DoubleCert(dc));
458 }
459
460 fn handle_timeout(&mut self) {
461 info!(
462 validator = %self.state.validator_id,
463 view = %self.state.current_view,
464 "view timeout, sending wish"
465 );
466
467 let wish = self.pacemaker.build_wish(
468 self.state.current_view,
469 self.state.validator_id,
470 self.state.highest_qc.clone(),
471 self.signer.as_ref(),
472 );
473
474 self.network.broadcast(wish.clone());
475
476 if let ConsensusMessage::Wish {
478 target_view,
479 validator,
480 highest_qc,
481 signature,
482 } = wish
483 && let Some(tc) = self.pacemaker.add_wish(
484 &self.state.validator_set,
485 target_view,
486 validator,
487 highest_qc,
488 signature,
489 )
490 {
491 self.network
492 .broadcast(ConsensusMessage::TimeoutCert(tc.clone()));
493 self.advance_view(ViewEntryTrigger::TimeoutCert(tc));
494 return;
495 }
496
497 self.pacemaker.on_timeout();
499 }
500
501 fn advance_view(&mut self, trigger: ViewEntryTrigger) {
502 let new_view = match &trigger {
503 ViewEntryTrigger::DoubleCert(_) => self.state.current_view.next(),
504 ViewEntryTrigger::TimeoutCert(tc) => ViewNumber(tc.view.as_u64() + 1),
505 ViewEntryTrigger::Genesis => ViewNumber(1),
506 };
507 self.advance_view_to(new_view, trigger);
508 }
509
510 fn advance_view_to(&mut self, new_view: ViewNumber, trigger: ViewEntryTrigger) {
511 if new_view <= self.state.current_view {
512 return;
513 }
514
515 let is_progress = matches!(&trigger, ViewEntryTrigger::DoubleCert(_));
517
518 self.vote_collector.clear_view(self.state.current_view);
519 self.pacemaker.clear_view(self.state.current_view);
520 self.status_count = 0;
521 self.current_view_qc = None;
522
523 if let Some(ref epoch) = self.pending_epoch
527 && new_view >= epoch.start_view
528 {
529 let new_epoch = self.pending_epoch.take().unwrap();
530 info!(
531 validator = %self.state.validator_id,
532 old_epoch = %self.state.current_epoch.number,
533 new_epoch = %new_epoch.number,
534 start_view = %new_epoch.start_view,
535 validators = new_epoch.validator_set.validator_count(),
536 "epoch transition"
537 );
538 self.state.validator_set = new_epoch.validator_set.clone();
539 self.state.current_epoch = new_epoch;
540 self.vote_collector = VoteCollector::new();
542 self.pacemaker = Pacemaker::new();
543 }
544
545 view_protocol::enter_view(
546 &mut self.state,
547 new_view,
548 trigger,
549 self.network.as_ref(),
550 self.signer.as_ref(),
551 );
552
553 if is_progress {
554 self.pacemaker.reset_on_progress();
555 } else {
556 self.pacemaker.reset_timer();
557 }
558
559 if self.state.is_leader() && self.state.step == ViewStep::WaitingForStatus {
561 self.try_propose();
563 }
564 }
565}