1use core::fmt;
2use std::collections::BTreeSet;
3use std::future::{pending, Future};
4use std::io;
5use std::sync::Arc;
6use std::time::Duration;
7
8use async_recursion::async_recursion;
9use async_trait::async_trait;
10use derive_where::derive_where;
11use eyre::eyre;
12use ractor::{Actor, ActorProcessingErr, ActorRef, RpcReplyPort};
13use tokio::time::Instant;
14use tracing::{debug, error, error_span, info};
15
16use malachitebft_codec as codec;
17use malachitebft_config::ConsensusConfig;
18use malachitebft_core_consensus::{
19 Effect, LivenessMsg, PeerId, Resumable, Resume, SignedConsensusMsg, VoteExtensionError,
20};
21use malachitebft_core_types::{
22 Context, Proposal, Round, Timeout, TimeoutKind, Timeouts, ValidatorSet, Validity, Value,
23 ValueId, ValueOrigin, ValueResponse as CoreValueResponse, Vote,
24};
25use malachitebft_metrics::Metrics;
26use malachitebft_signing::{SigningProvider, SigningProviderExt};
27use malachitebft_sync::HeightStartType;
28
29use crate::host::{HeightParams, HostMsg, HostRef, LocallyProposedValue, Next, ProposedValue};
30use crate::network::{NetworkEvent, NetworkMsg, NetworkRef};
31use crate::sync::Msg as SyncMsg;
32use crate::util::events::{Event, TxEvent};
33use crate::util::msg_buffer::MessageBuffer;
34use crate::util::output_port::OutputPort;
35use crate::util::streaming::StreamMessage;
36use crate::util::timers::{TimeoutElapsed, TimerScheduler};
37use crate::wal::{Msg as WalMsg, WalEntry, WalRef};
38
39pub use malachitebft_core_consensus::Error as ConsensusError;
40pub use malachitebft_core_consensus::Params as ConsensusParams;
41pub use malachitebft_core_consensus::State as ConsensusState;
42
43pub mod state_dump;
44use state_dump::StateDump;
45
46pub trait ConsensusCodec<Ctx>
54where
55 Ctx: Context,
56 Self: codec::Codec<Ctx::ProposalPart>,
57 Self: codec::Codec<SignedConsensusMsg<Ctx>>,
58 Self: codec::Codec<LivenessMsg<Ctx>>,
59 Self: codec::Codec<StreamMessage<Ctx::ProposalPart>>,
60{
61}
62
63impl<Ctx, Codec> ConsensusCodec<Ctx> for Codec
64where
65 Ctx: Context,
66 Self: codec::Codec<Ctx::ProposalPart>,
67 Self: codec::Codec<SignedConsensusMsg<Ctx>>,
68 Self: codec::Codec<LivenessMsg<Ctx>>,
69 Self: codec::Codec<StreamMessage<Ctx::ProposalPart>>,
70{
71}
72
73pub type ConsensusRef<Ctx> = ActorRef<Msg<Ctx>>;
74
75pub struct Consensus<Ctx>
76where
77 Ctx: Context,
78{
79 ctx: Ctx,
80 params: ConsensusParams<Ctx>,
81 consensus_config: ConsensusConfig,
82 signing_provider: Box<dyn SigningProvider<Ctx>>,
83 network: NetworkRef<Ctx>,
84 host: HostRef<Ctx>,
85 wal: WalRef<Ctx>,
86 sync: Arc<OutputPort<SyncMsg<Ctx>>>,
87 metrics: Metrics,
88 tx_event: TxEvent<Ctx>,
89 span: tracing::Span,
90}
91
92pub type ConsensusMsg<Ctx> = Msg<Ctx>;
93
94#[derive_where(Debug)]
95pub enum Msg<Ctx: Context> {
96 StartHeight(Ctx::Height, HeightParams<Ctx>),
98
99 NetworkEvent(NetworkEvent<Ctx>),
101
102 TimeoutElapsed(TimeoutElapsed<Timeout>),
104
105 ProposeValue(LocallyProposedValue<Ctx>),
107
108 ReceivedProposedValue(ProposedValue<Ctx>, ValueOrigin),
110
111 ProcessSyncResponse(CoreValueResponse<Ctx>),
113
114 RestartHeight(Ctx::Height, HeightParams<Ctx>),
124
125 DumpState(RpcReplyPort<Option<StateDump<Ctx>>>),
127}
128
129impl<Ctx: Context> fmt::Display for Msg<Ctx> {
130 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
131 match self {
132 Msg::StartHeight(height, params) => {
133 write!(f, "StartHeight(height={height} params={params:?})")
134 }
135 Msg::NetworkEvent(event) => match event {
136 NetworkEvent::Proposal(_, proposal) => write!(
137 f,
138 "NetworkEvent(Proposal height={} round={})",
139 proposal.height(),
140 proposal.round()
141 ),
142 NetworkEvent::ProposalPart(_, part) => {
143 write!(f, "NetworkEvent(ProposalPart sequence={})", part.sequence)
144 }
145 NetworkEvent::Vote(_, vote) => write!(
146 f,
147 "NetworkEvent(Vote height={} round={})",
148 vote.height(),
149 vote.round()
150 ),
151 _ => write!(f, "NetworkEvent"),
152 },
153 Msg::TimeoutElapsed(timeout) => write!(f, "TimeoutElapsed({})", timeout.display_key()),
154 Msg::ProposeValue(value) => write!(
155 f,
156 "ProposeValue(height={} round={})",
157 value.height, value.round
158 ),
159 Msg::ReceivedProposedValue(value, origin) => write!(
160 f,
161 "ReceivedProposedValue(height={} round={} origin={origin:?})",
162 value.height, value.round
163 ),
164 Msg::ProcessSyncResponse(response) => {
165 write!(
166 f,
167 "ProcessSyncResponse(peer={} height={} value={})",
168 response.peer, response.certificate.height, response.certificate.value_id
169 )
170 }
171 Msg::RestartHeight(height, params) => {
172 write!(f, "RestartHeight(height={height} params={params:?})")
173 }
174 Msg::DumpState(_) => write!(f, "DumpState"),
175 }
176 }
177}
178
179impl<Ctx: Context> From<NetworkEvent<Ctx>> for Msg<Ctx> {
180 fn from(event: NetworkEvent<Ctx>) -> Self {
181 Self::NetworkEvent(event)
182 }
183}
184
185type ConsensusInput<Ctx> = malachitebft_core_consensus::Input<Ctx>;
186
187impl<Ctx: Context> From<TimeoutElapsed<Timeout>> for Msg<Ctx> {
188 fn from(msg: TimeoutElapsed<Timeout>) -> Self {
189 Msg::TimeoutElapsed(msg)
190 }
191}
192
193type Timers = TimerScheduler<Timeout>;
194
195#[derive(Copy, Clone, Debug, PartialEq, Eq)]
196enum Phase {
197 Unstarted,
198 Ready,
199 Running,
200 Recovering,
201}
202
203const MAX_BUFFER_SIZE: usize = 1024;
206
207pub struct State<Ctx: Context> {
208 timers: Timers,
210
211 timeouts: Ctx::Timeouts,
213
214 consensus: Option<ConsensusState<Ctx>>,
217
218 connected_peers: BTreeSet<PeerId>,
220
221 phase: Phase,
223
224 msg_buffer: MessageBuffer<Ctx>,
227}
228
229impl<Ctx> State<Ctx>
230where
231 Ctx: Context,
232{
233 pub fn height(&self) -> Ctx::Height {
234 self.consensus
235 .as_ref()
236 .map(|c| c.height())
237 .unwrap_or_default()
238 }
239
240 pub fn round(&self) -> Round {
241 self.consensus
242 .as_ref()
243 .map(|c| c.round())
244 .unwrap_or(Round::Nil)
245 }
246
247 fn set_phase(&mut self, phase: Phase) {
248 if self.phase != phase {
249 info!(prev = ?self.phase, new = ?phase, "Phase transition");
250 self.phase = phase;
251 }
252 }
253}
254
255struct HandlerState<'a, Ctx: Context> {
256 phase: Phase,
257 timers: &'a mut Timers,
258 timeouts: Ctx::Timeouts,
259}
260
261impl<Ctx> Consensus<Ctx>
262where
263 Ctx: Context,
264{
265 #[allow(clippy::too_many_arguments)]
266 pub async fn spawn(
267 ctx: Ctx,
268 params: ConsensusParams<Ctx>,
269 consensus_config: ConsensusConfig,
270 signing_provider: Box<dyn SigningProvider<Ctx>>,
271 network: NetworkRef<Ctx>,
272 host: HostRef<Ctx>,
273 wal: WalRef<Ctx>,
274 sync: Arc<OutputPort<SyncMsg<Ctx>>>,
275 metrics: Metrics,
276 tx_event: TxEvent<Ctx>,
277 span: tracing::Span,
278 ) -> Result<ActorRef<Msg<Ctx>>, ractor::SpawnErr> {
279 let node = Self {
280 ctx,
281 params,
282 consensus_config,
283 signing_provider,
284 network,
285 host,
286 wal,
287 sync,
288 metrics,
289 tx_event,
290 span,
291 };
292
293 let (actor_ref, _) = Actor::spawn(None, node, ()).await?;
294 Ok(actor_ref)
295 }
296
297 async fn process_input(
298 &self,
299 myself: &ActorRef<Msg<Ctx>>,
300 state: &mut State<Ctx>,
301 input: ConsensusInput<Ctx>,
302 ) -> Result<(), ConsensusError<Ctx>> {
303 malachitebft_core_consensus::process!(
304 input: input,
305 state: state.consensus.as_mut().expect("Consensus not started"),
306 metrics: &self.metrics,
307 with: effect => {
308 let handler_state = HandlerState {
309 phase: state.phase,
310 timers: &mut state.timers,
311 timeouts: state.timeouts,
312 };
313
314 self.handle_effect(myself, handler_state, effect).await
315 }
316 )
317 }
318
319 #[async_recursion]
320 async fn process_buffered_msgs(
321 &self,
322 myself: &ActorRef<Msg<Ctx>>,
323 state: &mut State<Ctx>,
324 is_restart: bool,
325 ) {
326 if state.msg_buffer.is_empty() {
327 return;
328 }
329
330 if is_restart {
331 state.msg_buffer = MessageBuffer::new(MAX_BUFFER_SIZE);
332 }
333
334 info!(count = %state.msg_buffer.len(), "Replaying buffered messages");
335
336 while let Some(msg) = state.msg_buffer.pop() {
337 debug!("Replaying buffered message: {msg}");
338
339 if let Err(e) = self.handle_msg(myself.clone(), state, msg).await {
340 error!("Error when handling buffered message: {e:?}");
341 }
342 }
343 }
344
345 async fn handle_msg(
346 &self,
347 myself: ActorRef<Msg<Ctx>>,
348 state: &mut State<Ctx>,
349 msg: Msg<Ctx>,
350 ) -> Result<(), ActorProcessingErr> {
351 let is_restart = matches!(msg, Msg::RestartHeight(_, _));
352
353 match msg {
354 Msg::StartHeight(height, params) | Msg::RestartHeight(height, params) => {
355 if params.validator_set.count() == 0 {
357 return Err(eyre!("Validator set for height {height} is empty").into());
358 }
359
360 if state.consensus.is_none() {
362 state.consensus = Some(ConsensusState::new(
363 self.ctx.clone(),
364 height,
365 params.validator_set.clone(),
366 self.params.clone(),
367 self.consensus_config.queue_capacity,
368 ));
369 }
370
371 self.tx_event
372 .send(|| Event::StartedHeight(height, is_restart));
373
374 if let Err(e) = self
376 .network
377 .cast(NetworkMsg::UpdateValidatorSet(params.validator_set.clone()))
378 {
379 error!(%height, "Error pushing validator set to network layer: {e}");
380 }
381
382 let wal_entries = if is_restart {
384 hang_on_failure(self.wal_reset(height), |e| {
385 error!(%height, "Error when resetting WAL: {e}");
386 error!(%height, "Consensus may be in an inconsistent state after WAL reset failure");
387 })
388 .await;
389
390 vec![]
391 } else {
392 hang_on_failure(self.wal_fetch(height), |e| {
393 error!(%height, "Error when fetching WAL entries: {e}");
394 error!(%height, "Consensus may be in an inconsistent state after WAL fetch failure");
395 })
396 .await
397 };
398
399 if !wal_entries.is_empty() {
400 state.set_phase(Phase::Recovering);
402 }
403
404 state.timeouts = params.timeouts;
406
407 let result = self
409 .process_input(
410 &myself,
411 state,
412 ConsensusInput::StartHeight(
413 height,
414 params.validator_set,
415 is_restart,
416 params.target_time,
417 ),
418 )
419 .await;
420
421 if let Err(e) = result {
422 error!(%height, "Error when starting height: {e}");
423 }
424
425 if !wal_entries.is_empty() {
426 hang_on_failure(self.wal_replay(&myself, state, height, wal_entries), |e| {
427 error!(%height, "Error when replaying WAL: {e}");
428 error!(%height, "Consensus may be in an inconsistent state after WAL replay failure");
429 })
430 .await;
431 }
432
433 state.set_phase(Phase::Running);
435
436 let start_type = HeightStartType::from_is_restart(is_restart);
439 self.sync.send(SyncMsg::StartedHeight(height, start_type));
440
441 self.process_buffered_msgs(&myself, state, is_restart).await;
443
444 Ok(())
445 }
446
447 Msg::ProposeValue(value) => {
448 let result = self
449 .process_input(&myself, state, ConsensusInput::Propose(value.clone()))
450 .await;
451
452 if let Err(e) = result {
453 error!(
454 height = %value.height, round = %value.round,
455 "Error when processing ProposeValue message: {e}"
456 );
457 }
458
459 self.tx_event.send(|| Event::ProposedValue(value));
460
461 Ok(())
462 }
463
464 Msg::NetworkEvent(event) => {
465 match event {
466 NetworkEvent::Listening(address) => {
467 info!(%address, "Listening");
468
469 if state.phase == Phase::Unstarted {
470 state.set_phase(Phase::Ready);
471
472 self.host.call_and_forward(
473 |reply_to| HostMsg::ConsensusReady { reply_to },
474 &myself,
475 |(height, params)| ConsensusMsg::StartHeight(height, params),
476 None,
477 )?;
478 }
479 }
480
481 NetworkEvent::PeerConnected(peer_id) => {
482 if !state.connected_peers.insert(peer_id) {
483 return Ok(());
485 }
486
487 info!(%peer_id, total = %state.connected_peers.len(), "Connected to peer");
488
489 self.metrics.connected_peers.inc();
490 }
491
492 NetworkEvent::PeerDisconnected(peer_id) => {
493 info!(%peer_id, "Disconnected from peer");
494
495 if state.connected_peers.remove(&peer_id) {
496 self.metrics.connected_peers.dec();
497 }
498 }
499
500 NetworkEvent::Vote(from, vote) => {
501 self.tx_event
502 .send(|| Event::Received(SignedConsensusMsg::Vote(vote.clone())));
503
504 if let Err(e) = self
505 .process_input(&myself, state, ConsensusInput::Vote(vote))
506 .await
507 {
508 error!(%from, "Error when processing vote: {e}");
509 }
510 }
511
512 NetworkEvent::Proposal(from, proposal) => {
513 self.tx_event.send(|| {
514 Event::Received(SignedConsensusMsg::Proposal(proposal.clone()))
515 });
516
517 if self.params.value_payload.parts_only() {
518 error!(%from, "Properly configured peer should never send proposal messages in BlockPart mode");
519 return Ok(());
520 }
521
522 if let Err(e) = self
523 .process_input(&myself, state, ConsensusInput::Proposal(proposal))
524 .await
525 {
526 error!(%from, "Error when processing proposal: {e}");
527 }
528 }
529
530 NetworkEvent::PolkaCertificate(from, certificate) => {
531 if let Err(e) = self
532 .process_input(
533 &myself,
534 state,
535 ConsensusInput::PolkaCertificate(certificate),
536 )
537 .await
538 {
539 error!(%from, "Error when processing polka certificate: {e}");
540 }
541 }
542
543 NetworkEvent::RoundCertificate(from, certificate) => {
544 if let Err(e) = self
545 .process_input(
546 &myself,
547 state,
548 ConsensusInput::RoundCertificate(certificate),
549 )
550 .await
551 {
552 error!(%from, "Error when processing round certificate: {e}");
553 }
554 }
555
556 NetworkEvent::ProposalPart(from, part) => {
557 if self.params.value_payload.proposal_only() {
558 error!(%from, "Properly configured peer should never send proposal part messages in Proposal mode");
559 return Ok(());
560 }
561
562 self.host
563 .call_and_forward(
564 |reply_to| HostMsg::ReceivedProposalPart {
565 from,
566 part,
567 reply_to,
568 },
569 &myself,
570 move |value| {
571 Msg::ReceivedProposedValue(value, ValueOrigin::Consensus)
572 },
573 None,
574 )
575 .map_err(|e| {
576 eyre!("Error when forwarding proposal parts to host: {e}")
577 })?;
578 }
579
580 _ => {}
581 }
582
583 Ok(())
584 }
585
586 Msg::TimeoutElapsed(elapsed) => {
587 let Some(timeout) = state.timers.intercept_timer_msg(elapsed) else {
588 return Ok(());
590 };
591
592 if let Err(e) = self.timeout_elapsed(&myself, state, timeout).await {
593 error!("Error when processing TimeoutElapsed message: {e:?}");
594 }
595
596 Ok(())
597 }
598
599 Msg::ReceivedProposedValue(value, origin) => {
600 self.tx_event
601 .send(|| Event::ReceivedProposedValue(value.clone(), origin));
602
603 let result = self
604 .process_input(&myself, state, ConsensusInput::ProposedValue(value, origin))
605 .await;
606
607 if let Err(e) = result {
608 error!("Error when processing ReceivedProposedValue message: {e}");
609 }
610
611 Ok(())
612 }
613
614 Msg::ProcessSyncResponse(response) => {
615 let height = response.certificate.height;
616 let round = response.certificate.round;
617 let value = response.certificate.value_id.clone();
618 let peer = response.peer;
619
620 debug!(
621 %height, %round, %value, %peer,
622 "Processing sync response"
623 );
624
625 if let Err(e) = self
626 .process_input(&myself, state, ConsensusInput::SyncValueResponse(response))
627 .await
628 {
629 error!(
630 %height, %round, %value, %peer,
631 "Failed to process sync response: {e:?}"
632 );
633 }
634
635 Ok(())
636 }
637
638 Msg::DumpState(reply_to) => {
639 let state_dump = if let Some(consensus) = &state.consensus {
640 info!(
641 height = %consensus.height(),
642 round = %consensus.round(),
643 "Dumping consensus state"
644 );
645
646 Some(StateDump::new(consensus))
647 } else {
648 info!("Dumping consensus state: not started");
649 None
650 };
651
652 if let Err(e) = reply_to.send(state_dump) {
653 error!("Failed to reply with state dump: {e}");
654 }
655
656 Ok(())
657 }
658 }
659 }
660
661 async fn timeout_elapsed(
662 &self,
663 myself: &ActorRef<Msg<Ctx>>,
664 state: &mut State<Ctx>,
665 timeout: Timeout,
666 ) -> Result<(), ConsensusError<Ctx>> {
667 state.timers.cancel(&timeout);
669
670 if matches!(
672 timeout.kind,
673 TimeoutKind::Prevote | TimeoutKind::Precommit | TimeoutKind::Rebroadcast
674 ) {
675 info!(step = ?timeout.kind, "Timeout elapsed");
676
677 state.consensus.as_ref().inspect(|consensus| {
678 consensus.print_state();
679 });
680 }
681
682 self.process_input(myself, state, ConsensusInput::TimeoutElapsed(timeout))
684 .await?;
685
686 Ok(())
687 }
688
689 async fn wal_reset(&self, height: Ctx::Height) -> Result<(), ActorProcessingErr> {
690 let result = ractor::call!(self.wal, WalMsg::Reset, height);
691
692 match result {
693 Ok(Ok(())) => {
694 }
696 Ok(Err(e)) => {
697 error!(%height, "Failed to reset WAL: {e}");
698 return Err(e
699 .wrap_err(format!("Failed to reset WAL for height {height}"))
700 .into());
701 }
702 Err(e) => {
703 error!(%height, "Failed to send Reset command to WAL actor: {e}");
704 return Err(eyre!(e)
705 .wrap_err(format!(
706 "Failed to send Reset command to WAL actor for height {height}"
707 ))
708 .into());
709 }
710 }
711
712 Ok(())
713 }
714
715 async fn wal_fetch(
716 &self,
717 height: Ctx::Height,
718 ) -> Result<Vec<io::Result<WalEntry<Ctx>>>, ActorProcessingErr> {
719 let result = ractor::call!(self.wal, WalMsg::StartedHeight, height)?;
720
721 match result {
722 Ok(entries) if entries.is_empty() => {
723 debug!(%height, "No WAL entries to replay");
724
725 Ok(Vec::new())
727 }
728
729 Ok(entries) => {
730 info!("Found {} WAL entries", entries.len());
731
732 Ok(entries)
733 }
734
735 Err(e) => {
736 error!(%height, "Error when notifying WAL of started height: {e}");
737
738 self.tx_event.send(|| Event::WalResetError(Arc::new(e)));
739
740 Err(eyre!("Failed to fetch WAL entries for height {height}").into())
741 }
742 }
743 }
744
745 async fn wal_replay(
746 &self,
747 myself: &ActorRef<Msg<Ctx>>,
748 state: &mut State<Ctx>,
749 height: Ctx::Height,
750 entries: Vec<io::Result<WalEntry<Ctx>>>,
751 ) -> Result<(), Arc<ConsensusError<Ctx>>> {
752 use SignedConsensusMsg::*;
753
754 assert_eq!(state.phase, Phase::Recovering);
755
756 if entries.is_empty() {
757 return Ok(());
758 }
759
760 info!("Replaying {} WAL entries", entries.len());
761
762 self.tx_event
763 .send(|| Event::WalReplayBegin(height, entries.len()));
764
765 for entry in entries {
767 let entry = match entry {
768 Ok(entry) => entry,
769 Err(e) => {
770 error!("Corrupted WAL entry encountered: {e}");
771
772 let error = Arc::new(e);
773
774 self.tx_event
776 .send(|| Event::WalCorrupted(Arc::clone(&error)));
777
778 return Err(Arc::new(ConsensusError::WalCorrupted(error)));
779 }
780 };
781
782 self.tx_event.send(|| Event::WalReplayEntry(entry.clone()));
783
784 match entry {
785 WalEntry::ConsensusMsg(Vote(vote)) => {
786 info!("Replaying vote: {vote:?}");
787
788 if let Err(e) = self
789 .process_input(myself, state, ConsensusInput::Vote(vote))
790 .await
791 {
792 error!("Error when replaying vote: {e}");
793
794 let e = Arc::new(e);
795 self.tx_event.send({
796 let e = Arc::clone(&e);
797 || Event::WalReplayError(e)
798 });
799
800 return Err(e);
801 }
802 }
803
804 WalEntry::ConsensusMsg(Proposal(proposal)) => {
805 info!("Replaying proposal: {proposal:?}");
806
807 if let Err(e) = self
808 .process_input(myself, state, ConsensusInput::Proposal(proposal))
809 .await
810 {
811 error!("Error when replaying Proposal: {e}");
812
813 let e = Arc::new(e);
814 self.tx_event.send({
815 let e = Arc::clone(&e);
816 || Event::WalReplayError(e)
817 });
818
819 return Err(e);
820 }
821 }
822
823 WalEntry::Timeout(timeout) => {
824 info!("Replaying timeout: {timeout:?}");
825
826 if let Err(e) = self.timeout_elapsed(myself, state, timeout).await {
827 error!("Error when replaying TimeoutElapsed: {e}");
828
829 let e = Arc::new(e);
830 self.tx_event.send({
831 let e = Arc::clone(&e);
832 || Event::WalReplayError(e)
833 });
834
835 return Err(e);
836 }
837 }
838
839 WalEntry::ProposedValue(value) => {
840 info!("Replaying proposed value: {value:?}");
841
842 if let Err(e) = self
843 .process_input(
844 myself,
845 state,
846 ConsensusInput::ProposedValue(value, ValueOrigin::Consensus),
847 )
848 .await
849 {
850 error!("Error when replaying LocallyProposedValue: {e}");
851
852 let e = Arc::new(e);
853 self.tx_event.send({
854 let e = Arc::clone(&e);
855 || Event::WalReplayError(e)
856 });
857
858 return Err(e);
859 }
860 }
861 }
862 }
863
864 self.tx_event.send(|| Event::WalReplayDone(state.height()));
865
866 Ok(())
867 }
868
869 fn get_value(
870 &self,
871 myself: &ActorRef<Msg<Ctx>>,
872 height: Ctx::Height,
873 round: Round,
874 timeout: Duration,
875 ) -> Result<(), ActorProcessingErr> {
876 self.host.call_and_forward(
879 |reply_to| HostMsg::GetValue {
880 height,
881 round,
882 timeout,
883 reply_to,
884 },
885 myself,
886 Msg::<Ctx>::ProposeValue,
887 None,
888 )?;
889
890 Ok(())
891 }
892
893 async fn extend_vote(
894 &self,
895 height: Ctx::Height,
896 round: Round,
897 value_id: ValueId<Ctx>,
898 ) -> Result<Option<Ctx::Extension>, ActorProcessingErr> {
899 ractor::call!(self.host, |reply_to| HostMsg::ExtendVote {
900 height,
901 round,
902 value_id,
903 reply_to
904 })
905 .map_err(|e| eyre!("Failed to get earliest block height: {e:?}").into())
906 }
907
908 async fn verify_vote_extension(
909 &self,
910 height: Ctx::Height,
911 round: Round,
912 value_id: ValueId<Ctx>,
913 extension: Ctx::Extension,
914 ) -> Result<Result<(), VoteExtensionError>, ActorProcessingErr> {
915 ractor::call!(self.host, |reply_to| HostMsg::VerifyVoteExtension {
916 height,
917 round,
918 value_id,
919 extension,
920 reply_to
921 })
922 .map_err(|e| eyre!("Failed to verify vote extension: {e:?}").into())
923 }
924
925 async fn wal_append(
926 &self,
927 height: Ctx::Height,
928 entry: WalEntry<Ctx>,
929 phase: Phase,
930 ) -> Result<(), ActorProcessingErr> {
931 if phase == Phase::Recovering {
932 return Ok(());
933 }
934
935 let result = ractor::call!(self.wal, WalMsg::Append, height, entry);
936
937 match result {
938 Ok(Ok(())) => {
939 }
941 Ok(Err(e)) => {
942 error!("Failed to append entry to WAL: {e}");
943 }
944 Err(e) => {
945 error!("Failed to send Append command to WAL actor: {e}");
946 }
947 }
948
949 Ok(())
950 }
951
952 async fn wal_flush(&self, phase: Phase) -> Result<(), ActorProcessingErr> {
953 if phase == Phase::Recovering {
954 return Ok(());
955 }
956
957 let result = ractor::call!(self.wal, WalMsg::Flush);
958
959 match result {
960 Ok(Ok(())) => {
961 }
963 Ok(Err(e)) => {
964 error!("Failed to flush WAL to disk: {e}");
965 }
966 Err(e) => {
967 error!("Failed to send Flush command to WAL: {e}");
968 }
969 }
970
971 Ok(())
972 }
973
974 async fn handle_effect(
975 &self,
976 myself: &ActorRef<Msg<Ctx>>,
977 state: HandlerState<'_, Ctx>,
978 effect: Effect<Ctx>,
979 ) -> Result<Resume<Ctx>, ActorProcessingErr> {
980 match effect {
981 Effect::CancelAllTimeouts(r) => {
982 state.timers.cancel_all();
983 Ok(r.resume_with(()))
984 }
985
986 Effect::CancelTimeout(timeout, r) => {
987 state.timers.cancel(&timeout);
988 Ok(r.resume_with(()))
989 }
990
991 Effect::ScheduleTimeout(timeout, r) => {
992 let duration = state.timeouts.duration_for(timeout);
993 state.timers.start_timer(timeout, duration);
994
995 Ok(r.resume_with(()))
996 }
997
998 Effect::StartRound(height, round, proposer, role, r) => {
999 self.wal_flush(state.phase).await?;
1000
1001 let undecided_values =
1002 ractor::call!(self.host, |reply_to| HostMsg::StartedRound {
1003 height,
1004 round,
1005 proposer: proposer.clone(),
1006 role,
1007 reply_to,
1008 })?;
1009
1010 for value in undecided_values {
1011 let _ = myself.cast(Msg::ReceivedProposedValue(value, ValueOrigin::Consensus));
1012 }
1013
1014 self.tx_event
1015 .send(|| Event::StartedRound(height, round, proposer, role));
1016
1017 Ok(r.resume_with(()))
1018 }
1019
1020 Effect::SignProposal(proposal, r) => {
1021 let start = Instant::now();
1022
1023 let signed_proposal = self.signing_provider.sign_proposal(proposal).await?;
1024
1025 self.metrics
1026 .signature_signing_time
1027 .observe(start.elapsed().as_secs_f64());
1028
1029 Ok(r.resume_with(signed_proposal))
1030 }
1031
1032 Effect::SignVote(vote, r) => {
1033 let start = Instant::now();
1034
1035 let signed_vote = self.signing_provider.sign_vote(vote).await?;
1036
1037 self.metrics
1038 .signature_signing_time
1039 .observe(start.elapsed().as_secs_f64());
1040
1041 Ok(r.resume_with(signed_vote))
1042 }
1043
1044 Effect::VerifySignature(msg, pk, r) => {
1045 use malachitebft_core_consensus::ConsensusMsg as Msg;
1046
1047 let start = Instant::now();
1048
1049 let result = match msg.message {
1050 Msg::Vote(v) => {
1051 self.signing_provider
1052 .verify_signed_vote(&v, &msg.signature, &pk)
1053 .await?
1054 }
1055 Msg::Proposal(p) => {
1056 self.signing_provider
1057 .verify_signed_proposal(&p, &msg.signature, &pk)
1058 .await?
1059 }
1060 };
1061
1062 self.metrics
1063 .signature_verification_time
1064 .observe(start.elapsed().as_secs_f64());
1065
1066 Ok(r.resume_with(result.is_valid()))
1067 }
1068
1069 Effect::VerifyCommitCertificate(certificate, validator_set, thresholds, r) => {
1070 let result = self
1071 .signing_provider
1072 .verify_commit_certificate(&self.ctx, &certificate, &validator_set, thresholds)
1073 .await;
1074
1075 Ok(r.resume_with(result))
1076 }
1077
1078 Effect::VerifyPolkaCertificate(certificate, validator_set, thresholds, r) => {
1079 let result = self
1080 .signing_provider
1081 .verify_polka_certificate(&self.ctx, &certificate, &validator_set, thresholds)
1082 .await;
1083
1084 Ok(r.resume_with(result))
1085 }
1086
1087 Effect::VerifyRoundCertificate(certificate, validator_set, thresholds, r) => {
1088 let result = self
1089 .signing_provider
1090 .verify_round_certificate(&self.ctx, &certificate, &validator_set, thresholds)
1091 .await;
1092
1093 Ok(r.resume_with(result))
1094 }
1095
1096 Effect::ExtendVote(height, round, value_id, r) => {
1097 if let Some(extension) = self.extend_vote(height, round, value_id).await? {
1098 let signed_extension = self
1099 .signing_provider
1100 .sign_vote_extension(extension)
1101 .await
1102 .inspect_err(|e| {
1103 error!("Failed to sign vote extension: {e}");
1104 })
1105 .ok(); Ok(r.resume_with(signed_extension))
1108 } else {
1109 Ok(r.resume_with(None))
1110 }
1111 }
1112
1113 Effect::VerifyVoteExtension(height, round, value_id, signed_extension, pk, r) => {
1114 let result = self
1115 .signing_provider
1116 .verify_signed_vote_extension(
1117 &signed_extension.message,
1118 &signed_extension.signature,
1119 &pk,
1120 )
1121 .await?;
1122
1123 if result.is_invalid() {
1124 return Ok(r.resume_with(Err(VoteExtensionError::InvalidSignature)));
1125 }
1126
1127 let result = self
1128 .verify_vote_extension(height, round, value_id, signed_extension.message)
1129 .await?;
1130
1131 Ok(r.resume_with(result))
1132 }
1133
1134 Effect::PublishConsensusMsg(msg, r) => {
1135 self.wal_flush(state.phase).await?;
1138
1139 self.tx_event.send(|| Event::Published(msg.clone()));
1141
1142 self.network
1143 .cast(NetworkMsg::PublishConsensusMsg(msg))
1144 .map_err(|e| eyre!("Error when broadcasting consensus message: {e:?}"))?;
1145
1146 Ok(r.resume_with(()))
1147 }
1148
1149 Effect::PublishLivenessMsg(msg, r) => {
1150 match msg {
1151 LivenessMsg::Vote(ref msg) => {
1152 self.tx_event.send(|| Event::RepublishVote(msg.clone()));
1153 }
1154 LivenessMsg::PolkaCertificate(ref certificate) => {
1155 self.tx_event
1156 .send(|| Event::PolkaCertificate(certificate.clone()));
1157 }
1158 LivenessMsg::SkipRoundCertificate(ref certificate) => {
1159 self.tx_event
1160 .send(|| Event::SkipRoundCertificate(certificate.clone()));
1161 }
1162 }
1163
1164 self.network
1165 .cast(NetworkMsg::PublishLivenessMsg(msg))
1166 .map_err(|e| eyre!("Error when broadcasting liveness message: {e:?}"))?;
1167
1168 Ok(r.resume_with(()))
1169 }
1170
1171 Effect::RepublishVote(msg, r) => {
1172 self.tx_event.send(|| Event::RepublishVote(msg.clone()));
1174
1175 self.network
1176 .cast(NetworkMsg::PublishLivenessMsg(LivenessMsg::Vote(msg)))
1177 .map_err(|e| eyre!("Error when rebroadcasting vote message: {e:?}"))?;
1178
1179 Ok(r.resume_with(()))
1180 }
1181
1182 Effect::RepublishRoundCertificate(certificate, r) => {
1183 self.tx_event
1185 .send(|| Event::RebroadcastRoundCertificate(certificate.clone()));
1186
1187 self.network
1188 .cast(NetworkMsg::PublishLivenessMsg(
1189 LivenessMsg::SkipRoundCertificate(certificate),
1190 ))
1191 .map_err(|e| {
1192 eyre!("Error when rebroadcasting round certificate message: {e:?}")
1193 })?;
1194
1195 Ok(r.resume_with(()))
1196 }
1197
1198 Effect::GetValue(height, round, timeout, r) => {
1199 let timeout_duration = state.timeouts.duration_for(timeout);
1200
1201 self.get_value(myself, height, round, timeout_duration)
1202 .map_err(|e| {
1203 eyre!("Error when asking application for value to propose: {e:?}")
1204 })?;
1205
1206 Ok(r.resume_with(()))
1207 }
1208
1209 Effect::RestreamProposal(height, round, valid_round, address, value_id, r) => {
1210 self.host
1211 .cast(HostMsg::RestreamValue {
1212 height,
1213 round,
1214 valid_round,
1215 address,
1216 value_id,
1217 })
1218 .map_err(|e| eyre!("Error when sending decided value to host: {e:?}"))?;
1219
1220 Ok(r.resume_with(()))
1221 }
1222
1223 Effect::Decide(certificate, extensions, r) => {
1224 assert!(!certificate.commit_signatures.is_empty());
1225
1226 self.wal_flush(state.phase).await?;
1228
1229 self.tx_event.send(|| Event::Decided {
1231 commit_certificate: certificate.clone(),
1232 });
1233
1234 let height = certificate.height;
1235
1236 self.host
1239 .cast(HostMsg::Decided {
1240 certificate,
1241 extensions,
1242 })
1243 .map_err(|e| eyre!("Error when casting decided value to host: {e:?}"))?;
1244
1245 self.sync.send(SyncMsg::Decided(height));
1247
1248 Ok(r.resume_with(()))
1249 }
1250
1251 Effect::Finalize(certificate, extensions, evidence, r) => {
1252 assert!(!certificate.commit_signatures.is_empty());
1253
1254 let proposal_evidence_count = evidence
1256 .proposals
1257 .iter()
1258 .map(|addr| evidence.proposals.get(addr).map_or(0, |v| v.len()))
1259 .sum::<usize>();
1260 let vote_evidence_count = evidence
1261 .votes
1262 .iter()
1263 .map(|addr| evidence.votes.get(addr).map_or(0, |v| v.len()))
1264 .sum::<usize>();
1265 if proposal_evidence_count > 0 {
1266 self.metrics
1267 .equivocation_proposals
1268 .inc_by(proposal_evidence_count as u64);
1269 }
1270 if vote_evidence_count > 0 {
1271 self.metrics
1272 .equivocation_votes
1273 .inc_by(vote_evidence_count as u64);
1274 }
1275
1276 self.tx_event.send(|| Event::Finalized {
1278 commit_certificate: certificate.clone(),
1279 evidence: evidence.clone(),
1280 });
1281
1282 info!(
1283 height = %certificate.height,
1284 round = %certificate.round,
1285 total_signatures = certificate.commit_signatures.len(),
1286 "Height finalized with extended certificate"
1287 );
1288
1289 self.host
1291 .call_and_forward(
1292 |reply_to| HostMsg::Finalized {
1293 certificate,
1294 extensions,
1295 evidence,
1296 reply_to,
1297 },
1298 myself,
1299 |next| match next {
1300 Next::Start(h, params) => Msg::StartHeight(h, params),
1301 Next::Restart(h, params) => Msg::RestartHeight(h, params),
1302 },
1303 None,
1304 )
1305 .map_err(|e| eyre!("Error when sending finalized value to host: {e:?}"))?;
1306
1307 Ok(r.resume_with(()))
1308 }
1309
1310 Effect::InvalidSyncValue(peer, height, error, r) => {
1311 if let ConsensusError::InvalidCommitCertificate(certificate, e) = error {
1312 error!(
1313 %peer,
1314 %certificate.height,
1315 %certificate.round,
1316 "Invalid certificate received: {e}"
1317 );
1318
1319 self.sync
1320 .send(SyncMsg::InvalidValue(peer, certificate.height));
1321 } else {
1322 self.sync.send(SyncMsg::ValueProcessingError(peer, height));
1323 }
1324
1325 Ok(r.resume_with(()))
1326 }
1327
1328 Effect::ValidSyncValue(value, proposer, r) => {
1329 let certificate_height = value.certificate.height;
1330 let certificate_round = value.certificate.round;
1331
1332 let sync = Arc::clone(&self.sync);
1333
1334 self.host.call_and_forward(
1335 |reply_to| HostMsg::ProcessSyncedValue {
1336 height: certificate_height,
1337 round: certificate_round,
1338 proposer,
1339 value_bytes: value.value_bytes,
1340 reply_to,
1341 },
1342 myself,
1343 move |proposed| {
1344 if proposed.validity == Validity::Invalid
1345 || proposed.value.id() != value.certificate.value_id
1346 {
1347 sync.send(SyncMsg::InvalidValue(value.peer, certificate_height));
1348 }
1349
1350 Msg::<Ctx>::ReceivedProposedValue(proposed, ValueOrigin::Sync)
1351 },
1352 None,
1353 )?;
1354
1355 Ok(r.resume_with(()))
1356 }
1357
1358 Effect::WalAppend(height, entry, r) => {
1359 self.wal_append(height, entry, state.phase).await?;
1360 Ok(r.resume_with(()))
1361 }
1362 }
1363 }
1364}
1365
1366#[async_trait]
1367impl<Ctx> Actor for Consensus<Ctx>
1368where
1369 Ctx: Context,
1370{
1371 type Msg = Msg<Ctx>;
1372 type State = State<Ctx>;
1373 type Arguments = ();
1374
1375 #[tracing::instrument(
1376 name = "consensus",
1377 parent = &self.span,
1378 skip_all,
1379 )]
1380 async fn pre_start(
1381 &self,
1382 myself: ActorRef<Msg<Ctx>>,
1383 _args: (),
1384 ) -> Result<State<Ctx>, ActorProcessingErr> {
1385 info!("Consensus is starting");
1386
1387 self.network
1388 .cast(NetworkMsg::Subscribe(Box::new(myself.clone())))?;
1389
1390 Ok(State {
1391 timers: Timers::new(Box::new(myself)),
1392 timeouts: Ctx::Timeouts::default(),
1393 consensus: None,
1394 connected_peers: BTreeSet::new(),
1395 phase: Phase::Unstarted,
1396 msg_buffer: MessageBuffer::new(MAX_BUFFER_SIZE),
1397 })
1398 }
1399
1400 #[tracing::instrument(
1401 name = "consensus",
1402 parent = &self.span,
1403 skip_all,
1404 fields(height = %state.height(), round = %state.round())
1405 )]
1406 async fn post_start(
1407 &self,
1408 _myself: ActorRef<Msg<Ctx>>,
1409 state: &mut State<Ctx>,
1410 ) -> Result<(), ActorProcessingErr> {
1411 info!("Consensus has started");
1412
1413 state.timers.cancel_all();
1414 Ok(())
1415 }
1416
1417 #[tracing::instrument(
1418 name = "consensus",
1419 parent = &self.span,
1420 skip_all,
1421 fields(
1422 height = %span_height(state.height(), &msg),
1423 round = %span_round(state.round(), &msg)
1424 )
1425 )]
1426 async fn handle(
1427 &self,
1428 myself: ActorRef<Msg<Ctx>>,
1429 msg: Msg<Ctx>,
1430 state: &mut State<Ctx>,
1431 ) -> Result<(), ActorProcessingErr> {
1432 if state.phase != Phase::Running && should_buffer(&msg) {
1433 let _span = error_span!("buffer", phase = ?state.phase).entered();
1434 state.msg_buffer.buffer(msg);
1435 return Ok(());
1436 }
1437
1438 if let Err(e) = self.handle_msg(myself.clone(), state, msg).await {
1439 error!("Error when handling message: {e:?}");
1440 }
1441
1442 Ok(())
1443 }
1444
1445 #[tracing::instrument(
1446 name = "consensus",
1447 parent = &self.span,
1448 skip_all,
1449 fields(
1450 height = %state.height(),
1451 round = %state.round()
1452 )
1453 )]
1454 async fn post_stop(
1455 &self,
1456 _myself: ActorRef<Self::Msg>,
1457 state: &mut State<Ctx>,
1458 ) -> Result<(), ActorProcessingErr> {
1459 info!("Consensus has stopped");
1460 state.timers.cancel_all();
1461 Ok(())
1462 }
1463}
1464
1465fn should_buffer<Ctx: Context>(msg: &Msg<Ctx>) -> bool {
1466 !matches!(
1467 msg,
1468 Msg::StartHeight(..)
1469 | Msg::NetworkEvent(NetworkEvent::Listening(..))
1470 | Msg::NetworkEvent(NetworkEvent::PeerConnected(..))
1471 | Msg::NetworkEvent(NetworkEvent::PeerDisconnected(..))
1472 )
1473}
1474
1475fn span_height<Ctx: Context>(height: Ctx::Height, msg: &Msg<Ctx>) -> Ctx::Height {
1478 if let Msg::StartHeight(h, _) = msg {
1479 *h
1480 } else {
1481 height
1482 }
1483}
1484
1485fn span_round<Ctx: Context>(round: Round, msg: &Msg<Ctx>) -> Round {
1488 if let Msg::StartHeight(_, _) = msg {
1489 Round::new(0)
1490 } else {
1491 round
1492 }
1493}
1494
1495async fn hang_on_failure<A, E>(
1496 f: impl Future<Output = Result<A, E>>,
1497 on_error: impl FnOnce(E),
1498) -> A {
1499 match f.await {
1500 Ok(value) => value,
1501 Err(e) => {
1502 on_error(e);
1503 error!("Critical consensus failure, hanging to prevent safety violations. Manual intervention required!");
1504 hang().await
1505 }
1506 }
1507}
1508
1509async fn hang() -> ! {
1515 pending::<()>().await;
1516 unreachable!()
1517}