1use ruc::*;
2
3use std::sync::{Arc, RwLock};
4
5use crate::application::Application;
6use crate::commit::try_commit;
7use crate::leader;
8use crate::network::NetworkSink;
9use crate::pacemaker::Pacemaker;
10use crate::state::{ConsensusState, ViewStep};
11use crate::store::BlockStore;
12use crate::view_protocol::{self, ViewEntryTrigger};
13use crate::vote_collector::VoteCollector;
14
15use hotmint_types::epoch::Epoch;
16use hotmint_types::vote::VoteType;
17use hotmint_types::*;
18use tokio::sync::mpsc;
19use tracing::{info, warn};
20
21pub type SharedBlockStore = Arc<RwLock<Box<dyn BlockStore>>>;
23
24pub struct ConsensusEngine {
25 state: ConsensusState,
26 store: SharedBlockStore,
27 network: Box<dyn NetworkSink>,
28 app: Box<dyn Application>,
29 signer: Box<dyn Signer>,
30 vote_collector: VoteCollector,
31 pacemaker: Pacemaker,
32 msg_rx: mpsc::UnboundedReceiver<(ValidatorId, ConsensusMessage)>,
33 status_count: usize,
35 current_view_qc: Option<QuorumCertificate>,
37 pending_epoch: Option<Epoch>,
39}
40
41impl ConsensusEngine {
42 pub fn new(
43 state: ConsensusState,
44 store: SharedBlockStore,
45 network: Box<dyn NetworkSink>,
46 app: Box<dyn Application>,
47 signer: Box<dyn Signer>,
48 msg_rx: mpsc::UnboundedReceiver<(ValidatorId, ConsensusMessage)>,
49 ) -> Self {
50 Self {
51 state,
52 store,
53 network,
54 app,
55 signer,
56 vote_collector: VoteCollector::new(),
57 pacemaker: Pacemaker::new(),
58 msg_rx,
59 status_count: 0,
60 current_view_qc: None,
61 pending_epoch: None,
62 }
63 }
64
65 pub async fn run(mut self) {
67 self.enter_genesis_view();
68
69 loop {
70 let deadline = self.pacemaker.sleep_until_deadline();
71 tokio::pin!(deadline);
72
73 tokio::select! {
74 Some((sender, msg)) = self.msg_rx.recv() => {
75 if let Err(e) = self.handle_message(sender, msg) {
76 warn!(validator = %self.state.validator_id, error = %e, "error handling message");
77 }
78 }
79 _ = &mut deadline => {
80 self.handle_timeout();
81 }
82 }
83 }
84 }
85
86 fn enter_genesis_view(&mut self) {
87 let genesis_qc = QuorumCertificate {
89 block_hash: BlockHash::GENESIS,
90 view: ViewNumber::GENESIS,
91 aggregate_signature: AggregateSignature::new(
92 self.state.validator_set.validator_count(),
93 ),
94 };
95 self.state.highest_qc = Some(genesis_qc);
96
97 let view = ViewNumber(1);
98 view_protocol::enter_view(
99 &mut self.state,
100 view,
101 ViewEntryTrigger::Genesis,
102 self.network.as_ref(),
103 self.signer.as_ref(),
104 );
105 self.pacemaker.reset_timer();
106
107 if self.state.is_leader() {
109 self.state.step = ViewStep::WaitingForStatus;
110 self.try_propose();
112 }
113 }
114
115 fn try_propose(&mut self) {
116 let mut store = self.store.write().unwrap();
117 match view_protocol::propose(
118 &mut self.state,
119 store.as_mut(),
120 self.network.as_ref(),
121 self.app.as_ref(),
122 self.signer.as_ref(),
123 ) {
124 Ok(block) => {
125 drop(store);
126 self.leader_self_vote(block.hash);
128 }
129 Err(e) => {
130 warn!(
131 validator = %self.state.validator_id,
132 error = %e,
133 "failed to propose"
134 );
135 }
136 }
137 }
138
139 fn leader_self_vote(&mut self, block_hash: BlockHash) {
140 let vote_bytes = Vote::signing_bytes(self.state.current_view, &block_hash, VoteType::Vote);
141 let signature = self.signer.sign(&vote_bytes);
142 let vote = Vote {
143 block_hash,
144 view: self.state.current_view,
145 validator: self.state.validator_id,
146 signature,
147 vote_type: VoteType::Vote,
148 };
149 match self
150 .vote_collector
151 .add_vote(&self.state.validator_set, vote)
152 {
153 Ok(result) => {
154 self.handle_equivocation(&result);
155 if let Some(qc) = result.qc {
156 self.on_qc_formed(qc);
157 }
158 }
159 Err(e) => warn!(error = %e, "failed to add self vote"),
160 }
161 }
162
163 fn handle_message(&mut self, _sender: ValidatorId, msg: ConsensusMessage) -> Result<()> {
164 match msg {
165 ConsensusMessage::Propose {
166 block,
167 justify,
168 double_cert,
169 signature: _,
170 } => {
171 let block = *block;
172 let justify = *justify;
173 let double_cert = double_cert.map(|dc| *dc);
174
175 if block.view > self.state.current_view {
177 if let Some(ref dc) = double_cert {
178 let store = self.store.read().unwrap();
180 match try_commit(
181 dc,
182 store.as_ref(),
183 self.app.as_ref(),
184 &mut self.state.last_committed_height,
185 &self.state.current_epoch,
186 ) {
187 Ok(result) => {
188 if result.pending_epoch.is_some() {
189 self.pending_epoch = result.pending_epoch;
190 }
191 }
192 Err(e) => {
193 warn!(error = %e, "try_commit failed during fast-forward");
194 }
195 }
196 drop(store);
197 self.state.highest_double_cert = Some(dc.clone());
198 self.advance_view_to(block.view, ViewEntryTrigger::DoubleCert(dc.clone()));
199 } else {
200 return Ok(());
201 }
202 } else if block.view < self.state.current_view {
203 return Ok(());
204 }
205
206 let mut store = self.store.write().unwrap();
207 let maybe_pending = view_protocol::on_proposal(
208 &mut self.state,
209 block,
210 justify,
211 double_cert,
212 store.as_mut(),
213 self.network.as_ref(),
214 self.app.as_ref(),
215 self.signer.as_ref(),
216 )
217 .c(d!())?;
218 drop(store);
219
220 if let Some(epoch) = maybe_pending {
221 self.pending_epoch = Some(epoch);
222 }
223 }
224
225 ConsensusMessage::VoteMsg(vote) => {
226 if vote.view != self.state.current_view {
227 return Ok(());
228 }
229 if !self.state.is_leader() {
230 return Ok(());
231 }
232 if vote.vote_type != VoteType::Vote {
233 return Ok(());
234 }
235
236 let result = self
237 .vote_collector
238 .add_vote(&self.state.validator_set, vote)
239 .c(d!())?;
240 self.handle_equivocation(&result);
241 if let Some(qc) = result.qc {
242 self.on_qc_formed(qc);
243 }
244 }
245
246 ConsensusMessage::Prepare {
247 certificate,
248 signature: _,
249 } => {
250 if certificate.view < self.state.current_view {
251 return Ok(());
252 }
253 if certificate.view == self.state.current_view {
254 view_protocol::on_prepare(
255 &mut self.state,
256 certificate,
257 self.network.as_ref(),
258 self.signer.as_ref(),
259 );
260 }
261 }
262
263 ConsensusMessage::Vote2Msg(vote) => {
264 if vote.vote_type != VoteType::Vote2 {
265 return Ok(());
266 }
267 let result = self
268 .vote_collector
269 .add_vote(&self.state.validator_set, vote)
270 .c(d!())?;
271 self.handle_equivocation(&result);
272 if let Some(outer_qc) = result.qc {
273 self.on_double_cert_formed(outer_qc);
274 }
275 }
276
277 ConsensusMessage::Wish {
278 target_view,
279 validator,
280 highest_qc,
281 signature,
282 } => {
283 if let Some(tc) = self.pacemaker.add_wish(
284 &self.state.validator_set,
285 target_view,
286 validator,
287 highest_qc,
288 signature,
289 ) {
290 info!(
291 validator = %self.state.validator_id,
292 view = %tc.view,
293 "TC formed, advancing view"
294 );
295 self.network
296 .broadcast(ConsensusMessage::TimeoutCert(tc.clone()));
297 self.advance_view(ViewEntryTrigger::TimeoutCert(tc));
298 }
299 }
300
301 ConsensusMessage::TimeoutCert(tc) => {
302 if self.pacemaker.should_relay_tc(&tc) {
303 self.network
304 .broadcast(ConsensusMessage::TimeoutCert(tc.clone()));
305 }
306 let new_view = ViewNumber(tc.view.as_u64() + 1);
307 if new_view > self.state.current_view {
308 self.advance_view(ViewEntryTrigger::TimeoutCert(tc));
309 }
310 }
311
312 ConsensusMessage::StatusCert {
313 locked_qc,
314 validator: _,
315 signature: _,
316 } => {
317 if self.state.is_leader() && self.state.step == ViewStep::WaitingForStatus {
318 if let Some(ref qc) = locked_qc {
319 self.state.update_highest_qc(qc);
320 }
321 self.status_count += 1;
322 let needed = self.state.validator_set.quorum_threshold() as usize - 1;
323 if self.status_count >= needed {
324 self.try_propose();
325 }
326 }
327 }
328 }
329 Ok(())
330 }
331
332 fn handle_equivocation(&self, result: &crate::vote_collector::VoteResult) {
333 if let Some(ref proof) = result.equivocation {
334 warn!(
335 validator = %proof.validator,
336 view = %proof.view,
337 "equivocation detected!"
338 );
339 if let Err(e) = self.app.on_evidence(proof) {
340 warn!(error = %e, "on_evidence callback failed");
341 }
342 }
343 }
344
345 fn on_qc_formed(&mut self, qc: QuorumCertificate) {
346 self.current_view_qc = Some(qc.clone());
348
349 view_protocol::on_votes_collected(
350 &mut self.state,
351 qc.clone(),
352 self.network.as_ref(),
353 self.signer.as_ref(),
354 );
355
356 let vote_bytes =
358 Vote::signing_bytes(self.state.current_view, &qc.block_hash, VoteType::Vote2);
359 let signature = self.signer.sign(&vote_bytes);
360 let vote = Vote {
361 block_hash: qc.block_hash,
362 view: self.state.current_view,
363 validator: self.state.validator_id,
364 signature,
365 vote_type: VoteType::Vote2,
366 };
367
368 self.state.update_locked_qc(&qc);
370
371 let next_leader_id =
372 leader::next_leader(&self.state.validator_set, self.state.current_view);
373 if next_leader_id == self.state.validator_id {
374 match self
376 .vote_collector
377 .add_vote(&self.state.validator_set, vote)
378 {
379 Ok(result) => {
380 self.handle_equivocation(&result);
381 if let Some(outer_qc) = result.qc {
382 self.on_double_cert_formed(outer_qc);
383 }
384 }
385 Err(e) => warn!(error = %e, "failed to add self vote2"),
386 }
387 } else {
388 self.network
389 .send_to(next_leader_id, ConsensusMessage::Vote2Msg(vote));
390 }
391 }
392
393 fn on_double_cert_formed(&mut self, outer_qc: QuorumCertificate) {
394 let inner_qc = match self.current_view_qc.take() {
396 Some(qc) if qc.block_hash == outer_qc.block_hash => qc,
397 _ => {
398 match &self.state.locked_qc {
400 Some(qc) if qc.block_hash == outer_qc.block_hash => qc.clone(),
401 _ => match &self.state.highest_qc {
402 Some(qc) if qc.block_hash == outer_qc.block_hash => qc.clone(),
403 _ => {
404 warn!(
405 validator = %self.state.validator_id,
406 "double cert formed but can't find matching inner QC"
407 );
408 return;
409 }
410 },
411 }
412 }
413 };
414
415 let dc = DoubleCertificate { inner_qc, outer_qc };
416
417 info!(
418 validator = %self.state.validator_id,
419 view = %self.state.current_view,
420 hash = %dc.inner_qc.block_hash,
421 "double certificate formed, committing"
422 );
423
424 {
426 let store = self.store.read().unwrap();
427 match try_commit(
428 &dc,
429 store.as_ref(),
430 self.app.as_ref(),
431 &mut self.state.last_committed_height,
432 &self.state.current_epoch,
433 ) {
434 Ok(result) => {
435 if result.pending_epoch.is_some() {
436 self.pending_epoch = result.pending_epoch;
437 }
438 }
439 Err(e) => {
440 warn!(error = %e, "try_commit failed in double cert handler");
441 }
442 }
443 }
444
445 self.state.highest_double_cert = Some(dc.clone());
446
447 self.advance_view(ViewEntryTrigger::DoubleCert(dc));
449 }
450
451 fn handle_timeout(&mut self) {
452 info!(
453 validator = %self.state.validator_id,
454 view = %self.state.current_view,
455 "view timeout, sending wish"
456 );
457
458 let wish = self.pacemaker.build_wish(
459 self.state.current_view,
460 self.state.validator_id,
461 self.state.highest_qc.clone(),
462 self.signer.as_ref(),
463 );
464
465 self.network.broadcast(wish.clone());
466
467 if let ConsensusMessage::Wish {
469 target_view,
470 validator,
471 highest_qc,
472 signature,
473 } = wish
474 && let Some(tc) = self.pacemaker.add_wish(
475 &self.state.validator_set,
476 target_view,
477 validator,
478 highest_qc,
479 signature,
480 )
481 {
482 self.network
483 .broadcast(ConsensusMessage::TimeoutCert(tc.clone()));
484 self.advance_view(ViewEntryTrigger::TimeoutCert(tc));
485 return;
486 }
487
488 self.pacemaker.on_timeout();
490 }
491
492 fn advance_view(&mut self, trigger: ViewEntryTrigger) {
493 let new_view = match &trigger {
494 ViewEntryTrigger::DoubleCert(_) => self.state.current_view.next(),
495 ViewEntryTrigger::TimeoutCert(tc) => ViewNumber(tc.view.as_u64() + 1),
496 ViewEntryTrigger::Genesis => ViewNumber(1),
497 };
498 self.advance_view_to(new_view, trigger);
499 }
500
501 fn advance_view_to(&mut self, new_view: ViewNumber, trigger: ViewEntryTrigger) {
502 if new_view <= self.state.current_view {
503 return;
504 }
505
506 let is_progress = matches!(&trigger, ViewEntryTrigger::DoubleCert(_));
508
509 self.vote_collector.clear_view(self.state.current_view);
510 self.pacemaker.clear_view(self.state.current_view);
511 self.status_count = 0;
512 self.current_view_qc = None;
513
514 if let Some(mut new_epoch) = self.pending_epoch.take() {
516 new_epoch.start_view = new_view;
517 info!(
518 validator = %self.state.validator_id,
519 old_epoch = %self.state.current_epoch.number,
520 new_epoch = %new_epoch.number,
521 start_view = %new_view,
522 validators = new_epoch.validator_set.validator_count(),
523 "epoch transition"
524 );
525 self.state.validator_set = new_epoch.validator_set.clone();
526 self.state.current_epoch = new_epoch;
527 self.vote_collector = VoteCollector::new();
529 self.pacemaker = Pacemaker::new();
530 }
531
532 view_protocol::enter_view(
533 &mut self.state,
534 new_view,
535 trigger,
536 self.network.as_ref(),
537 self.signer.as_ref(),
538 );
539
540 if is_progress {
541 self.pacemaker.reset_on_progress();
542 } else {
543 self.pacemaker.reset_timer();
544 }
545
546 if self.state.is_leader() && self.state.step == ViewStep::WaitingForStatus {
548 self.try_propose();
550 }
551 }
552}