Skip to main content

arc_malachitebft_engine/
consensus.rs

1use core::fmt;
2use std::collections::BTreeSet;
3use std::future::{pending, Future};
4use std::io;
5use std::sync::Arc;
6use std::time::Duration;
7
8use async_recursion::async_recursion;
9use async_trait::async_trait;
10use derive_where::derive_where;
11use eyre::eyre;
12use ractor::{Actor, ActorProcessingErr, ActorRef, RpcReplyPort};
13use tokio::time::Instant;
14use tracing::{debug, error, error_span, info};
15
16use malachitebft_codec as codec;
17use malachitebft_config::ConsensusConfig;
18use malachitebft_core_consensus::{
19    Effect, LivenessMsg, PeerId, Resumable, Resume, SignedConsensusMsg, VoteExtensionError,
20};
21use malachitebft_core_types::{
22    Context, Proposal, Round, Timeout, TimeoutKind, Timeouts, ValidatorSet, Validity, Value,
23    ValueId, ValueOrigin, ValueResponse as CoreValueResponse, Vote,
24};
25use malachitebft_metrics::Metrics;
26use malachitebft_signing::{SigningProvider, SigningProviderExt};
27use malachitebft_sync::HeightStartType;
28
29use crate::host::{HeightParams, HostMsg, HostRef, LocallyProposedValue, Next, ProposedValue};
30use crate::network::{NetworkEvent, NetworkMsg, NetworkRef};
31use crate::sync::Msg as SyncMsg;
32use crate::util::events::{Event, TxEvent};
33use crate::util::msg_buffer::MessageBuffer;
34use crate::util::output_port::OutputPort;
35use crate::util::streaming::StreamMessage;
36use crate::util::timers::{TimeoutElapsed, TimerScheduler};
37use crate::wal::{Msg as WalMsg, WalEntry, WalRef};
38
39pub use malachitebft_core_consensus::Error as ConsensusError;
40pub use malachitebft_core_consensus::Params as ConsensusParams;
41pub use malachitebft_core_consensus::State as ConsensusState;
42
43pub mod state_dump;
44use state_dump::StateDump;
45
46/// Codec for consensus messages.
47///
48/// This trait is automatically implemented for any type that implements:
49/// - [`codec::Codec<Ctx::ProposalPart>`]
50/// - [`codec::Codec<SignedConsensusMsg<Ctx>>`]
51/// - [`codec::Codec<PolkaCertificate<Ctx>>`]
52/// - [`codec::Codec<StreamMessage<Ctx::ProposalPart>>`]
53pub trait ConsensusCodec<Ctx>
54where
55    Ctx: Context,
56    Self: codec::Codec<Ctx::ProposalPart>,
57    Self: codec::Codec<SignedConsensusMsg<Ctx>>,
58    Self: codec::Codec<LivenessMsg<Ctx>>,
59    Self: codec::Codec<StreamMessage<Ctx::ProposalPart>>,
60{
61}
62
63impl<Ctx, Codec> ConsensusCodec<Ctx> for Codec
64where
65    Ctx: Context,
66    Self: codec::Codec<Ctx::ProposalPart>,
67    Self: codec::Codec<SignedConsensusMsg<Ctx>>,
68    Self: codec::Codec<LivenessMsg<Ctx>>,
69    Self: codec::Codec<StreamMessage<Ctx::ProposalPart>>,
70{
71}
72
73pub type ConsensusRef<Ctx> = ActorRef<Msg<Ctx>>;
74
75pub struct Consensus<Ctx>
76where
77    Ctx: Context,
78{
79    ctx: Ctx,
80    params: ConsensusParams<Ctx>,
81    consensus_config: ConsensusConfig,
82    signing_provider: Box<dyn SigningProvider<Ctx>>,
83    network: NetworkRef<Ctx>,
84    host: HostRef<Ctx>,
85    wal: WalRef<Ctx>,
86    sync: Arc<OutputPort<SyncMsg<Ctx>>>,
87    metrics: Metrics,
88    tx_event: TxEvent<Ctx>,
89    span: tracing::Span,
90}
91
92pub type ConsensusMsg<Ctx> = Msg<Ctx>;
93
94#[derive_where(Debug)]
95pub enum Msg<Ctx: Context> {
96    /// Start consensus for the given height and provided parameters.
97    StartHeight(Ctx::Height, HeightParams<Ctx>),
98
99    /// Received an event from the gossip layer
100    NetworkEvent(NetworkEvent<Ctx>),
101
102    /// A timeout has elapsed
103    TimeoutElapsed(TimeoutElapsed<Timeout>),
104
105    /// The proposal builder has built a value and can be used in a new proposal consensus message
106    ProposeValue(LocallyProposedValue<Ctx>),
107
108    /// Received and assembled the full value proposed by a validator
109    ReceivedProposedValue(ProposedValue<Ctx>, ValueOrigin),
110
111    /// Process a sync response
112    ProcessSyncResponse(CoreValueResponse<Ctx>),
113
114    /// Instructs consensus to restart at a given height with the provided parameters.
115    ///
116    /// On this input consensus resets the Write-Ahead Log.
117    ///
118    /// # Warning
119    /// This operation should be used with extreme caution as it can lead to safety violations:
120    /// 1. The application must clean all state associated with the height for which commit has failed
121    /// 2. Since consensus resets its write-ahead log, the node may equivocate on proposals and votes
122    ///    for the restarted height, potentially violating protocol safety
123    RestartHeight(Ctx::Height, HeightParams<Ctx>),
124
125    /// Request to dump the current consensus state
126    DumpState(RpcReplyPort<Option<StateDump<Ctx>>>),
127}
128
129impl<Ctx: Context> fmt::Display for Msg<Ctx> {
130    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
131        match self {
132            Msg::StartHeight(height, params) => {
133                write!(f, "StartHeight(height={height} params={params:?})")
134            }
135            Msg::NetworkEvent(event) => match event {
136                NetworkEvent::Proposal(_, proposal) => write!(
137                    f,
138                    "NetworkEvent(Proposal height={} round={})",
139                    proposal.height(),
140                    proposal.round()
141                ),
142                NetworkEvent::ProposalPart(_, part) => {
143                    write!(f, "NetworkEvent(ProposalPart sequence={})", part.sequence)
144                }
145                NetworkEvent::Vote(_, vote) => write!(
146                    f,
147                    "NetworkEvent(Vote height={} round={})",
148                    vote.height(),
149                    vote.round()
150                ),
151                _ => write!(f, "NetworkEvent"),
152            },
153            Msg::TimeoutElapsed(timeout) => write!(f, "TimeoutElapsed({})", timeout.display_key()),
154            Msg::ProposeValue(value) => write!(
155                f,
156                "ProposeValue(height={} round={})",
157                value.height, value.round
158            ),
159            Msg::ReceivedProposedValue(value, origin) => write!(
160                f,
161                "ReceivedProposedValue(height={} round={} origin={origin:?})",
162                value.height, value.round
163            ),
164            Msg::ProcessSyncResponse(response) => {
165                write!(
166                    f,
167                    "ProcessSyncResponse(peer={} height={} value={})",
168                    response.peer, response.certificate.height, response.certificate.value_id
169                )
170            }
171            Msg::RestartHeight(height, params) => {
172                write!(f, "RestartHeight(height={height} params={params:?})")
173            }
174            Msg::DumpState(_) => write!(f, "DumpState"),
175        }
176    }
177}
178
179impl<Ctx: Context> From<NetworkEvent<Ctx>> for Msg<Ctx> {
180    fn from(event: NetworkEvent<Ctx>) -> Self {
181        Self::NetworkEvent(event)
182    }
183}
184
185type ConsensusInput<Ctx> = malachitebft_core_consensus::Input<Ctx>;
186
187impl<Ctx: Context> From<TimeoutElapsed<Timeout>> for Msg<Ctx> {
188    fn from(msg: TimeoutElapsed<Timeout>) -> Self {
189        Msg::TimeoutElapsed(msg)
190    }
191}
192
193type Timers = TimerScheduler<Timeout>;
194
195#[derive(Copy, Clone, Debug, PartialEq, Eq)]
196enum Phase {
197    Unstarted,
198    Ready,
199    Running,
200    Recovering,
201}
202
203/// Maximum number of messages to buffer while consensus is
204/// in the `Unstarted` or `Recovering` phase
205const MAX_BUFFER_SIZE: usize = 1024;
206
207pub struct State<Ctx: Context> {
208    /// Scheduler for timers
209    timers: Timers,
210
211    /// Timeouts for various consensus steps
212    timeouts: Ctx::Timeouts,
213
214    /// The state of the consensus state machine,
215    /// or `None` if consensus has not been started yet.
216    consensus: Option<ConsensusState<Ctx>>,
217
218    /// The set of peers we are connected to.
219    connected_peers: BTreeSet<PeerId>,
220
221    /// The current phase
222    phase: Phase,
223
224    /// A buffer of messages that were received while
225    /// consensus was `Unstarted` or in the `Recovering` phase
226    msg_buffer: MessageBuffer<Ctx>,
227}
228
229impl<Ctx> State<Ctx>
230where
231    Ctx: Context,
232{
233    pub fn height(&self) -> Ctx::Height {
234        self.consensus
235            .as_ref()
236            .map(|c| c.height())
237            .unwrap_or_default()
238    }
239
240    pub fn round(&self) -> Round {
241        self.consensus
242            .as_ref()
243            .map(|c| c.round())
244            .unwrap_or(Round::Nil)
245    }
246
247    fn set_phase(&mut self, phase: Phase) {
248        if self.phase != phase {
249            info!(prev = ?self.phase, new = ?phase, "Phase transition");
250            self.phase = phase;
251        }
252    }
253}
254
255struct HandlerState<'a, Ctx: Context> {
256    phase: Phase,
257    timers: &'a mut Timers,
258    timeouts: Ctx::Timeouts,
259}
260
261impl<Ctx> Consensus<Ctx>
262where
263    Ctx: Context,
264{
265    #[allow(clippy::too_many_arguments)]
266    pub async fn spawn(
267        ctx: Ctx,
268        params: ConsensusParams<Ctx>,
269        consensus_config: ConsensusConfig,
270        signing_provider: Box<dyn SigningProvider<Ctx>>,
271        network: NetworkRef<Ctx>,
272        host: HostRef<Ctx>,
273        wal: WalRef<Ctx>,
274        sync: Arc<OutputPort<SyncMsg<Ctx>>>,
275        metrics: Metrics,
276        tx_event: TxEvent<Ctx>,
277        span: tracing::Span,
278    ) -> Result<ActorRef<Msg<Ctx>>, ractor::SpawnErr> {
279        let node = Self {
280            ctx,
281            params,
282            consensus_config,
283            signing_provider,
284            network,
285            host,
286            wal,
287            sync,
288            metrics,
289            tx_event,
290            span,
291        };
292
293        let (actor_ref, _) = Actor::spawn(None, node, ()).await?;
294        Ok(actor_ref)
295    }
296
297    async fn process_input(
298        &self,
299        myself: &ActorRef<Msg<Ctx>>,
300        state: &mut State<Ctx>,
301        input: ConsensusInput<Ctx>,
302    ) -> Result<(), ConsensusError<Ctx>> {
303        malachitebft_core_consensus::process!(
304            input: input,
305            state: state.consensus.as_mut().expect("Consensus not started"),
306            metrics: &self.metrics,
307            with: effect => {
308                let handler_state = HandlerState {
309                    phase: state.phase,
310                    timers: &mut state.timers,
311                    timeouts: state.timeouts,
312                };
313
314                self.handle_effect(myself, handler_state, effect).await
315            }
316        )
317    }
318
319    #[async_recursion]
320    async fn process_buffered_msgs(
321        &self,
322        myself: &ActorRef<Msg<Ctx>>,
323        state: &mut State<Ctx>,
324        is_restart: bool,
325    ) {
326        if state.msg_buffer.is_empty() {
327            return;
328        }
329
330        if is_restart {
331            state.msg_buffer = MessageBuffer::new(MAX_BUFFER_SIZE);
332        }
333
334        info!(count = %state.msg_buffer.len(), "Replaying buffered messages");
335
336        while let Some(msg) = state.msg_buffer.pop() {
337            debug!("Replaying buffered message: {msg}");
338
339            if let Err(e) = self.handle_msg(myself.clone(), state, msg).await {
340                error!("Error when handling buffered message: {e:?}");
341            }
342        }
343    }
344
345    async fn handle_msg(
346        &self,
347        myself: ActorRef<Msg<Ctx>>,
348        state: &mut State<Ctx>,
349        msg: Msg<Ctx>,
350    ) -> Result<(), ActorProcessingErr> {
351        let is_restart = matches!(msg, Msg::RestartHeight(_, _));
352
353        match msg {
354            Msg::StartHeight(height, params) | Msg::RestartHeight(height, params) => {
355                // Check that the validator set is provided and that it is not empty
356                if params.validator_set.count() == 0 {
357                    return Err(eyre!("Validator set for height {height} is empty").into());
358                }
359
360                // Initialize consensus state if this is the first height we start
361                if state.consensus.is_none() {
362                    state.consensus = Some(ConsensusState::new(
363                        self.ctx.clone(),
364                        height,
365                        params.validator_set.clone(),
366                        self.params.clone(),
367                        self.consensus_config.queue_capacity,
368                    ));
369                }
370
371                self.tx_event
372                    .send(|| Event::StartedHeight(height, is_restart));
373
374                // Push validator set to network layer
375                if let Err(e) = self
376                    .network
377                    .cast(NetworkMsg::UpdateValidatorSet(params.validator_set.clone()))
378                {
379                    error!(%height, "Error pushing validator set to network layer: {e}");
380                }
381
382                // Fetch entries from the WAL or reset the WAL if this is a restart
383                let wal_entries = if is_restart {
384                    hang_on_failure(self.wal_reset(height), |e| {
385                        error!(%height, "Error when resetting WAL: {e}");
386                        error!(%height, "Consensus may be in an inconsistent state after WAL reset failure");
387                    })
388                    .await;
389
390                    vec![]
391                } else {
392                    hang_on_failure(self.wal_fetch(height), |e| {
393                        error!(%height, "Error when fetching WAL entries: {e}");
394                        error!(%height, "Consensus may be in an inconsistent state after WAL fetch failure");
395                    })
396                    .await
397                };
398
399                if !wal_entries.is_empty() {
400                    // Set the phase to `Recovering` while we replay the WAL
401                    state.set_phase(Phase::Recovering);
402                }
403
404                // Update the timeouts
405                state.timeouts = params.timeouts;
406
407                // Start consensus for the given height
408                let result = self
409                    .process_input(
410                        &myself,
411                        state,
412                        ConsensusInput::StartHeight(
413                            height,
414                            params.validator_set,
415                            is_restart,
416                            params.target_time,
417                        ),
418                    )
419                    .await;
420
421                if let Err(e) = result {
422                    error!(%height, "Error when starting height: {e}");
423                }
424
425                if !wal_entries.is_empty() {
426                    hang_on_failure(self.wal_replay(&myself, state, height, wal_entries), |e| {
427                        error!(%height, "Error when replaying WAL: {e}");
428                        error!(%height, "Consensus may be in an inconsistent state after WAL replay failure");
429                    })
430                    .await;
431                }
432
433                // Set the phase to `Running` now that we have replayed the WAL
434                state.set_phase(Phase::Running);
435
436                // Notify the sync actor that we have started a new height.
437                // We want the sync actor to drain buffered values only after consensus is ready and running.
438                let start_type = HeightStartType::from_is_restart(is_restart);
439                self.sync.send(SyncMsg::StartedHeight(height, start_type));
440
441                // Process any buffered messages, now that we are in the `Running` phase
442                self.process_buffered_msgs(&myself, state, is_restart).await;
443
444                Ok(())
445            }
446
447            Msg::ProposeValue(value) => {
448                let result = self
449                    .process_input(&myself, state, ConsensusInput::Propose(value.clone()))
450                    .await;
451
452                if let Err(e) = result {
453                    error!(
454                        height = %value.height, round = %value.round,
455                        "Error when processing ProposeValue message: {e}"
456                    );
457                }
458
459                self.tx_event.send(|| Event::ProposedValue(value));
460
461                Ok(())
462            }
463
464            Msg::NetworkEvent(event) => {
465                match event {
466                    NetworkEvent::Listening(address) => {
467                        info!(%address, "Listening");
468
469                        if state.phase == Phase::Unstarted {
470                            state.set_phase(Phase::Ready);
471
472                            self.host.call_and_forward(
473                                |reply_to| HostMsg::ConsensusReady { reply_to },
474                                &myself,
475                                |(height, params)| ConsensusMsg::StartHeight(height, params),
476                                None,
477                            )?;
478                        }
479                    }
480
481                    NetworkEvent::PeerConnected(peer_id) => {
482                        if !state.connected_peers.insert(peer_id) {
483                            // We already saw that peer, ignoring...
484                            return Ok(());
485                        }
486
487                        info!(%peer_id, total = %state.connected_peers.len(), "Connected to peer");
488
489                        self.metrics.connected_peers.inc();
490                    }
491
492                    NetworkEvent::PeerDisconnected(peer_id) => {
493                        info!(%peer_id, "Disconnected from peer");
494
495                        if state.connected_peers.remove(&peer_id) {
496                            self.metrics.connected_peers.dec();
497                        }
498                    }
499
500                    NetworkEvent::Vote(from, vote) => {
501                        self.tx_event
502                            .send(|| Event::Received(SignedConsensusMsg::Vote(vote.clone())));
503
504                        if let Err(e) = self
505                            .process_input(&myself, state, ConsensusInput::Vote(vote))
506                            .await
507                        {
508                            error!(%from, "Error when processing vote: {e}");
509                        }
510                    }
511
512                    NetworkEvent::Proposal(from, proposal) => {
513                        self.tx_event.send(|| {
514                            Event::Received(SignedConsensusMsg::Proposal(proposal.clone()))
515                        });
516
517                        if self.params.value_payload.parts_only() {
518                            error!(%from, "Properly configured peer should never send proposal messages in BlockPart mode");
519                            return Ok(());
520                        }
521
522                        if let Err(e) = self
523                            .process_input(&myself, state, ConsensusInput::Proposal(proposal))
524                            .await
525                        {
526                            error!(%from, "Error when processing proposal: {e}");
527                        }
528                    }
529
530                    NetworkEvent::PolkaCertificate(from, certificate) => {
531                        if let Err(e) = self
532                            .process_input(
533                                &myself,
534                                state,
535                                ConsensusInput::PolkaCertificate(certificate),
536                            )
537                            .await
538                        {
539                            error!(%from, "Error when processing polka certificate: {e}");
540                        }
541                    }
542
543                    NetworkEvent::RoundCertificate(from, certificate) => {
544                        if let Err(e) = self
545                            .process_input(
546                                &myself,
547                                state,
548                                ConsensusInput::RoundCertificate(certificate),
549                            )
550                            .await
551                        {
552                            error!(%from, "Error when processing round certificate: {e}");
553                        }
554                    }
555
556                    NetworkEvent::ProposalPart(from, part) => {
557                        if self.params.value_payload.proposal_only() {
558                            error!(%from, "Properly configured peer should never send proposal part messages in Proposal mode");
559                            return Ok(());
560                        }
561
562                        self.host
563                            .call_and_forward(
564                                |reply_to| HostMsg::ReceivedProposalPart {
565                                    from,
566                                    part,
567                                    reply_to,
568                                },
569                                &myself,
570                                move |value| {
571                                    Msg::ReceivedProposedValue(value, ValueOrigin::Consensus)
572                                },
573                                None,
574                            )
575                            .map_err(|e| {
576                                eyre!("Error when forwarding proposal parts to host: {e}")
577                            })?;
578                    }
579
580                    _ => {}
581                }
582
583                Ok(())
584            }
585
586            Msg::TimeoutElapsed(elapsed) => {
587                let Some(timeout) = state.timers.intercept_timer_msg(elapsed) else {
588                    // Timer was cancelled or already processed, ignore
589                    return Ok(());
590                };
591
592                if let Err(e) = self.timeout_elapsed(&myself, state, timeout).await {
593                    error!("Error when processing TimeoutElapsed message: {e:?}");
594                }
595
596                Ok(())
597            }
598
599            Msg::ReceivedProposedValue(value, origin) => {
600                self.tx_event
601                    .send(|| Event::ReceivedProposedValue(value.clone(), origin));
602
603                let result = self
604                    .process_input(&myself, state, ConsensusInput::ProposedValue(value, origin))
605                    .await;
606
607                if let Err(e) = result {
608                    error!("Error when processing ReceivedProposedValue message: {e}");
609                }
610
611                Ok(())
612            }
613
614            Msg::ProcessSyncResponse(response) => {
615                let height = response.certificate.height;
616                let round = response.certificate.round;
617                let value = response.certificate.value_id.clone();
618                let peer = response.peer;
619
620                debug!(
621                    %height, %round, %value, %peer,
622                    "Processing sync response"
623                );
624
625                if let Err(e) = self
626                    .process_input(&myself, state, ConsensusInput::SyncValueResponse(response))
627                    .await
628                {
629                    error!(
630                        %height, %round, %value, %peer,
631                        "Failed to process sync response: {e:?}"
632                    );
633                }
634
635                Ok(())
636            }
637
638            Msg::DumpState(reply_to) => {
639                let state_dump = if let Some(consensus) = &state.consensus {
640                    info!(
641                        height = %consensus.height(),
642                        round  = %consensus.round(),
643                        "Dumping consensus state"
644                    );
645
646                    Some(StateDump::new(consensus))
647                } else {
648                    info!("Dumping consensus state: not started");
649                    None
650                };
651
652                if let Err(e) = reply_to.send(state_dump) {
653                    error!("Failed to reply with state dump: {e}");
654                }
655
656                Ok(())
657            }
658        }
659    }
660
661    async fn timeout_elapsed(
662        &self,
663        myself: &ActorRef<Msg<Ctx>>,
664        state: &mut State<Ctx>,
665        timeout: Timeout,
666    ) -> Result<(), ConsensusError<Ctx>> {
667        // Make sure the associated timer is cancelled
668        state.timers.cancel(&timeout);
669
670        // Print debug information if the timeout is for a prevote or precommit
671        if matches!(
672            timeout.kind,
673            TimeoutKind::Prevote | TimeoutKind::Precommit | TimeoutKind::Rebroadcast
674        ) {
675            info!(step = ?timeout.kind, "Timeout elapsed");
676
677            state.consensus.as_ref().inspect(|consensus| {
678                consensus.print_state();
679            });
680        }
681
682        // Process the timeout event
683        self.process_input(myself, state, ConsensusInput::TimeoutElapsed(timeout))
684            .await?;
685
686        Ok(())
687    }
688
689    async fn wal_reset(&self, height: Ctx::Height) -> Result<(), ActorProcessingErr> {
690        let result = ractor::call!(self.wal, WalMsg::Reset, height);
691
692        match result {
693            Ok(Ok(())) => {
694                // Success
695            }
696            Ok(Err(e)) => {
697                error!(%height, "Failed to reset WAL: {e}");
698                return Err(e
699                    .wrap_err(format!("Failed to reset WAL for height {height}"))
700                    .into());
701            }
702            Err(e) => {
703                error!(%height, "Failed to send Reset command to WAL actor: {e}");
704                return Err(eyre!(e)
705                    .wrap_err(format!(
706                        "Failed to send Reset command to WAL actor for height {height}"
707                    ))
708                    .into());
709            }
710        }
711
712        Ok(())
713    }
714
715    async fn wal_fetch(
716        &self,
717        height: Ctx::Height,
718    ) -> Result<Vec<io::Result<WalEntry<Ctx>>>, ActorProcessingErr> {
719        let result = ractor::call!(self.wal, WalMsg::StartedHeight, height)?;
720
721        match result {
722            Ok(entries) if entries.is_empty() => {
723                debug!(%height, "No WAL entries to replay");
724
725                // Nothing to replay
726                Ok(Vec::new())
727            }
728
729            Ok(entries) => {
730                info!("Found {} WAL entries", entries.len());
731
732                Ok(entries)
733            }
734
735            Err(e) => {
736                error!(%height, "Error when notifying WAL of started height: {e}");
737
738                self.tx_event.send(|| Event::WalResetError(Arc::new(e)));
739
740                Err(eyre!("Failed to fetch WAL entries for height {height}").into())
741            }
742        }
743    }
744
745    async fn wal_replay(
746        &self,
747        myself: &ActorRef<Msg<Ctx>>,
748        state: &mut State<Ctx>,
749        height: Ctx::Height,
750        entries: Vec<io::Result<WalEntry<Ctx>>>,
751    ) -> Result<(), Arc<ConsensusError<Ctx>>> {
752        use SignedConsensusMsg::*;
753
754        assert_eq!(state.phase, Phase::Recovering);
755
756        if entries.is_empty() {
757            return Ok(());
758        }
759
760        info!("Replaying {} WAL entries", entries.len());
761
762        self.tx_event
763            .send(|| Event::WalReplayBegin(height, entries.len()));
764
765        // Replay WAL entries, stopping at the first corrupted entry
766        for entry in entries {
767            let entry = match entry {
768                Ok(entry) => entry,
769                Err(e) => {
770                    error!("Corrupted WAL entry encountered: {e}");
771
772                    let error = Arc::new(e);
773
774                    // Report corrupted entries if any were found
775                    self.tx_event
776                        .send(|| Event::WalCorrupted(Arc::clone(&error)));
777
778                    return Err(Arc::new(ConsensusError::WalCorrupted(error)));
779                }
780            };
781
782            self.tx_event.send(|| Event::WalReplayEntry(entry.clone()));
783
784            match entry {
785                WalEntry::ConsensusMsg(Vote(vote)) => {
786                    info!("Replaying vote: {vote:?}");
787
788                    if let Err(e) = self
789                        .process_input(myself, state, ConsensusInput::Vote(vote))
790                        .await
791                    {
792                        error!("Error when replaying vote: {e}");
793
794                        let e = Arc::new(e);
795                        self.tx_event.send({
796                            let e = Arc::clone(&e);
797                            || Event::WalReplayError(e)
798                        });
799
800                        return Err(e);
801                    }
802                }
803
804                WalEntry::ConsensusMsg(Proposal(proposal)) => {
805                    info!("Replaying proposal: {proposal:?}");
806
807                    if let Err(e) = self
808                        .process_input(myself, state, ConsensusInput::Proposal(proposal))
809                        .await
810                    {
811                        error!("Error when replaying Proposal: {e}");
812
813                        let e = Arc::new(e);
814                        self.tx_event.send({
815                            let e = Arc::clone(&e);
816                            || Event::WalReplayError(e)
817                        });
818
819                        return Err(e);
820                    }
821                }
822
823                WalEntry::Timeout(timeout) => {
824                    info!("Replaying timeout: {timeout:?}");
825
826                    if let Err(e) = self.timeout_elapsed(myself, state, timeout).await {
827                        error!("Error when replaying TimeoutElapsed: {e}");
828
829                        let e = Arc::new(e);
830                        self.tx_event.send({
831                            let e = Arc::clone(&e);
832                            || Event::WalReplayError(e)
833                        });
834
835                        return Err(e);
836                    }
837                }
838
839                WalEntry::ProposedValue(value) => {
840                    info!("Replaying proposed value: {value:?}");
841
842                    if let Err(e) = self
843                        .process_input(
844                            myself,
845                            state,
846                            ConsensusInput::ProposedValue(value, ValueOrigin::Consensus),
847                        )
848                        .await
849                    {
850                        error!("Error when replaying LocallyProposedValue: {e}");
851
852                        let e = Arc::new(e);
853                        self.tx_event.send({
854                            let e = Arc::clone(&e);
855                            || Event::WalReplayError(e)
856                        });
857
858                        return Err(e);
859                    }
860                }
861            }
862        }
863
864        self.tx_event.send(|| Event::WalReplayDone(state.height()));
865
866        Ok(())
867    }
868
869    fn get_value(
870        &self,
871        myself: &ActorRef<Msg<Ctx>>,
872        height: Ctx::Height,
873        round: Round,
874        timeout: Duration,
875    ) -> Result<(), ActorProcessingErr> {
876        // Call `GetValue` on the Host actor, and forward the reply
877        // to the current actor, wrapping it in `Msg::ProposeValue`.
878        self.host.call_and_forward(
879            |reply_to| HostMsg::GetValue {
880                height,
881                round,
882                timeout,
883                reply_to,
884            },
885            myself,
886            Msg::<Ctx>::ProposeValue,
887            None,
888        )?;
889
890        Ok(())
891    }
892
893    async fn extend_vote(
894        &self,
895        height: Ctx::Height,
896        round: Round,
897        value_id: ValueId<Ctx>,
898    ) -> Result<Option<Ctx::Extension>, ActorProcessingErr> {
899        ractor::call!(self.host, |reply_to| HostMsg::ExtendVote {
900            height,
901            round,
902            value_id,
903            reply_to
904        })
905        .map_err(|e| eyre!("Failed to get earliest block height: {e:?}").into())
906    }
907
908    async fn verify_vote_extension(
909        &self,
910        height: Ctx::Height,
911        round: Round,
912        value_id: ValueId<Ctx>,
913        extension: Ctx::Extension,
914    ) -> Result<Result<(), VoteExtensionError>, ActorProcessingErr> {
915        ractor::call!(self.host, |reply_to| HostMsg::VerifyVoteExtension {
916            height,
917            round,
918            value_id,
919            extension,
920            reply_to
921        })
922        .map_err(|e| eyre!("Failed to verify vote extension: {e:?}").into())
923    }
924
925    async fn wal_append(
926        &self,
927        height: Ctx::Height,
928        entry: WalEntry<Ctx>,
929        phase: Phase,
930    ) -> Result<(), ActorProcessingErr> {
931        if phase == Phase::Recovering {
932            return Ok(());
933        }
934
935        let result = ractor::call!(self.wal, WalMsg::Append, height, entry);
936
937        match result {
938            Ok(Ok(())) => {
939                // Success
940            }
941            Ok(Err(e)) => {
942                error!("Failed to append entry to WAL: {e}");
943            }
944            Err(e) => {
945                error!("Failed to send Append command to WAL actor: {e}");
946            }
947        }
948
949        Ok(())
950    }
951
952    async fn wal_flush(&self, phase: Phase) -> Result<(), ActorProcessingErr> {
953        if phase == Phase::Recovering {
954            return Ok(());
955        }
956
957        let result = ractor::call!(self.wal, WalMsg::Flush);
958
959        match result {
960            Ok(Ok(())) => {
961                // Success
962            }
963            Ok(Err(e)) => {
964                error!("Failed to flush WAL to disk: {e}");
965            }
966            Err(e) => {
967                error!("Failed to send Flush command to WAL: {e}");
968            }
969        }
970
971        Ok(())
972    }
973
974    async fn handle_effect(
975        &self,
976        myself: &ActorRef<Msg<Ctx>>,
977        state: HandlerState<'_, Ctx>,
978        effect: Effect<Ctx>,
979    ) -> Result<Resume<Ctx>, ActorProcessingErr> {
980        match effect {
981            Effect::CancelAllTimeouts(r) => {
982                state.timers.cancel_all();
983                Ok(r.resume_with(()))
984            }
985
986            Effect::CancelTimeout(timeout, r) => {
987                state.timers.cancel(&timeout);
988                Ok(r.resume_with(()))
989            }
990
991            Effect::ScheduleTimeout(timeout, r) => {
992                let duration = state.timeouts.duration_for(timeout);
993                state.timers.start_timer(timeout, duration);
994
995                Ok(r.resume_with(()))
996            }
997
998            Effect::StartRound(height, round, proposer, role, r) => {
999                self.wal_flush(state.phase).await?;
1000
1001                let undecided_values =
1002                    ractor::call!(self.host, |reply_to| HostMsg::StartedRound {
1003                        height,
1004                        round,
1005                        proposer: proposer.clone(),
1006                        role,
1007                        reply_to,
1008                    })?;
1009
1010                for value in undecided_values {
1011                    let _ = myself.cast(Msg::ReceivedProposedValue(value, ValueOrigin::Consensus));
1012                }
1013
1014                self.tx_event
1015                    .send(|| Event::StartedRound(height, round, proposer, role));
1016
1017                Ok(r.resume_with(()))
1018            }
1019
1020            Effect::SignProposal(proposal, r) => {
1021                let start = Instant::now();
1022
1023                let signed_proposal = self.signing_provider.sign_proposal(proposal).await?;
1024
1025                self.metrics
1026                    .signature_signing_time
1027                    .observe(start.elapsed().as_secs_f64());
1028
1029                Ok(r.resume_with(signed_proposal))
1030            }
1031
1032            Effect::SignVote(vote, r) => {
1033                let start = Instant::now();
1034
1035                let signed_vote = self.signing_provider.sign_vote(vote).await?;
1036
1037                self.metrics
1038                    .signature_signing_time
1039                    .observe(start.elapsed().as_secs_f64());
1040
1041                Ok(r.resume_with(signed_vote))
1042            }
1043
1044            Effect::VerifySignature(msg, pk, r) => {
1045                use malachitebft_core_consensus::ConsensusMsg as Msg;
1046
1047                let start = Instant::now();
1048
1049                let result = match msg.message {
1050                    Msg::Vote(v) => {
1051                        self.signing_provider
1052                            .verify_signed_vote(&v, &msg.signature, &pk)
1053                            .await?
1054                    }
1055                    Msg::Proposal(p) => {
1056                        self.signing_provider
1057                            .verify_signed_proposal(&p, &msg.signature, &pk)
1058                            .await?
1059                    }
1060                };
1061
1062                self.metrics
1063                    .signature_verification_time
1064                    .observe(start.elapsed().as_secs_f64());
1065
1066                Ok(r.resume_with(result.is_valid()))
1067            }
1068
1069            Effect::VerifyCommitCertificate(certificate, validator_set, thresholds, r) => {
1070                let result = self
1071                    .signing_provider
1072                    .verify_commit_certificate(&self.ctx, &certificate, &validator_set, thresholds)
1073                    .await;
1074
1075                Ok(r.resume_with(result))
1076            }
1077
1078            Effect::VerifyPolkaCertificate(certificate, validator_set, thresholds, r) => {
1079                let result = self
1080                    .signing_provider
1081                    .verify_polka_certificate(&self.ctx, &certificate, &validator_set, thresholds)
1082                    .await;
1083
1084                Ok(r.resume_with(result))
1085            }
1086
1087            Effect::VerifyRoundCertificate(certificate, validator_set, thresholds, r) => {
1088                let result = self
1089                    .signing_provider
1090                    .verify_round_certificate(&self.ctx, &certificate, &validator_set, thresholds)
1091                    .await;
1092
1093                Ok(r.resume_with(result))
1094            }
1095
1096            Effect::ExtendVote(height, round, value_id, r) => {
1097                if let Some(extension) = self.extend_vote(height, round, value_id).await? {
1098                    let signed_extension = self
1099                        .signing_provider
1100                        .sign_vote_extension(extension)
1101                        .await
1102                        .inspect_err(|e| {
1103                            error!("Failed to sign vote extension: {e}");
1104                        })
1105                        .ok(); // Discard the vote extension if signing fails
1106
1107                    Ok(r.resume_with(signed_extension))
1108                } else {
1109                    Ok(r.resume_with(None))
1110                }
1111            }
1112
1113            Effect::VerifyVoteExtension(height, round, value_id, signed_extension, pk, r) => {
1114                let result = self
1115                    .signing_provider
1116                    .verify_signed_vote_extension(
1117                        &signed_extension.message,
1118                        &signed_extension.signature,
1119                        &pk,
1120                    )
1121                    .await?;
1122
1123                if result.is_invalid() {
1124                    return Ok(r.resume_with(Err(VoteExtensionError::InvalidSignature)));
1125                }
1126
1127                let result = self
1128                    .verify_vote_extension(height, round, value_id, signed_extension.message)
1129                    .await?;
1130
1131                Ok(r.resume_with(result))
1132            }
1133
1134            Effect::PublishConsensusMsg(msg, r) => {
1135                // Sync the WAL to disk before we broadcast the message
1136                // NOTE: The message has already been append to the WAL by the `WalAppend` effect.
1137                self.wal_flush(state.phase).await?;
1138
1139                // Notify any subscribers that we are about to publish a message
1140                self.tx_event.send(|| Event::Published(msg.clone()));
1141
1142                self.network
1143                    .cast(NetworkMsg::PublishConsensusMsg(msg))
1144                    .map_err(|e| eyre!("Error when broadcasting consensus message: {e:?}"))?;
1145
1146                Ok(r.resume_with(()))
1147            }
1148
1149            Effect::PublishLivenessMsg(msg, r) => {
1150                match msg {
1151                    LivenessMsg::Vote(ref msg) => {
1152                        self.tx_event.send(|| Event::RepublishVote(msg.clone()));
1153                    }
1154                    LivenessMsg::PolkaCertificate(ref certificate) => {
1155                        self.tx_event
1156                            .send(|| Event::PolkaCertificate(certificate.clone()));
1157                    }
1158                    LivenessMsg::SkipRoundCertificate(ref certificate) => {
1159                        self.tx_event
1160                            .send(|| Event::SkipRoundCertificate(certificate.clone()));
1161                    }
1162                }
1163
1164                self.network
1165                    .cast(NetworkMsg::PublishLivenessMsg(msg))
1166                    .map_err(|e| eyre!("Error when broadcasting liveness message: {e:?}"))?;
1167
1168                Ok(r.resume_with(()))
1169            }
1170
1171            Effect::RepublishVote(msg, r) => {
1172                // Notify any subscribers that we are about to rebroadcast a vote
1173                self.tx_event.send(|| Event::RepublishVote(msg.clone()));
1174
1175                self.network
1176                    .cast(NetworkMsg::PublishLivenessMsg(LivenessMsg::Vote(msg)))
1177                    .map_err(|e| eyre!("Error when rebroadcasting vote message: {e:?}"))?;
1178
1179                Ok(r.resume_with(()))
1180            }
1181
1182            Effect::RepublishRoundCertificate(certificate, r) => {
1183                // Notify any subscribers that we are about to rebroadcast a round certificate
1184                self.tx_event
1185                    .send(|| Event::RebroadcastRoundCertificate(certificate.clone()));
1186
1187                self.network
1188                    .cast(NetworkMsg::PublishLivenessMsg(
1189                        LivenessMsg::SkipRoundCertificate(certificate),
1190                    ))
1191                    .map_err(|e| {
1192                        eyre!("Error when rebroadcasting round certificate message: {e:?}")
1193                    })?;
1194
1195                Ok(r.resume_with(()))
1196            }
1197
1198            Effect::GetValue(height, round, timeout, r) => {
1199                let timeout_duration = state.timeouts.duration_for(timeout);
1200
1201                self.get_value(myself, height, round, timeout_duration)
1202                    .map_err(|e| {
1203                        eyre!("Error when asking application for value to propose: {e:?}")
1204                    })?;
1205
1206                Ok(r.resume_with(()))
1207            }
1208
1209            Effect::RestreamProposal(height, round, valid_round, address, value_id, r) => {
1210                self.host
1211                    .cast(HostMsg::RestreamValue {
1212                        height,
1213                        round,
1214                        valid_round,
1215                        address,
1216                        value_id,
1217                    })
1218                    .map_err(|e| eyre!("Error when sending decided value to host: {e:?}"))?;
1219
1220                Ok(r.resume_with(()))
1221            }
1222
1223            Effect::Decide(certificate, extensions, r) => {
1224                assert!(!certificate.commit_signatures.is_empty());
1225
1226                // Sync the WAL to disk before we decide the value
1227                self.wal_flush(state.phase).await?;
1228
1229                // Notify any subscribers about the decided value
1230                self.tx_event.send(|| Event::Decided {
1231                    commit_certificate: certificate.clone(),
1232                });
1233
1234                let height = certificate.height;
1235
1236                // Notify the host about the decided value
1237                // Finalization will follow, so don't request a reply
1238                self.host
1239                    .cast(HostMsg::Decided {
1240                        certificate,
1241                        extensions,
1242                    })
1243                    .map_err(|e| eyre!("Error when casting decided value to host: {e:?}"))?;
1244
1245                // Notify the sync actor about the decided height
1246                self.sync.send(SyncMsg::Decided(height));
1247
1248                Ok(r.resume_with(()))
1249            }
1250
1251            Effect::Finalize(certificate, extensions, evidence, r) => {
1252                assert!(!certificate.commit_signatures.is_empty());
1253
1254                // Update metrics for equivocation evidence
1255                let proposal_evidence_count = evidence
1256                    .proposals
1257                    .iter()
1258                    .map(|addr| evidence.proposals.get(addr).map_or(0, |v| v.len()))
1259                    .sum::<usize>();
1260                let vote_evidence_count = evidence
1261                    .votes
1262                    .iter()
1263                    .map(|addr| evidence.votes.get(addr).map_or(0, |v| v.len()))
1264                    .sum::<usize>();
1265                if proposal_evidence_count > 0 {
1266                    self.metrics
1267                        .equivocation_proposals
1268                        .inc_by(proposal_evidence_count as u64);
1269                }
1270                if vote_evidence_count > 0 {
1271                    self.metrics
1272                        .equivocation_votes
1273                        .inc_by(vote_evidence_count as u64);
1274                }
1275
1276                // Notify any subscribers about the finalized value
1277                self.tx_event.send(|| Event::Finalized {
1278                    commit_certificate: certificate.clone(),
1279                    evidence: evidence.clone(),
1280                });
1281
1282                info!(
1283                    height = %certificate.height,
1284                    round = %certificate.round,
1285                    total_signatures = certificate.commit_signatures.len(),
1286                    "Height finalized with extended certificate"
1287                );
1288
1289                // Notify the host about the finalized value
1290                self.host
1291                    .call_and_forward(
1292                        |reply_to| HostMsg::Finalized {
1293                            certificate,
1294                            extensions,
1295                            evidence,
1296                            reply_to,
1297                        },
1298                        myself,
1299                        |next| match next {
1300                            Next::Start(h, params) => Msg::StartHeight(h, params),
1301                            Next::Restart(h, params) => Msg::RestartHeight(h, params),
1302                        },
1303                        None,
1304                    )
1305                    .map_err(|e| eyre!("Error when sending finalized value to host: {e:?}"))?;
1306
1307                Ok(r.resume_with(()))
1308            }
1309
1310            Effect::InvalidSyncValue(peer, height, error, r) => {
1311                if let ConsensusError::InvalidCommitCertificate(certificate, e) = error {
1312                    error!(
1313                        %peer,
1314                        %certificate.height,
1315                        %certificate.round,
1316                        "Invalid certificate received: {e}"
1317                    );
1318
1319                    self.sync
1320                        .send(SyncMsg::InvalidValue(peer, certificate.height));
1321                } else {
1322                    self.sync.send(SyncMsg::ValueProcessingError(peer, height));
1323                }
1324
1325                Ok(r.resume_with(()))
1326            }
1327
1328            Effect::ValidSyncValue(value, proposer, r) => {
1329                let certificate_height = value.certificate.height;
1330                let certificate_round = value.certificate.round;
1331
1332                let sync = Arc::clone(&self.sync);
1333
1334                self.host.call_and_forward(
1335                    |reply_to| HostMsg::ProcessSyncedValue {
1336                        height: certificate_height,
1337                        round: certificate_round,
1338                        proposer,
1339                        value_bytes: value.value_bytes,
1340                        reply_to,
1341                    },
1342                    myself,
1343                    move |proposed| {
1344                        if proposed.validity == Validity::Invalid
1345                            || proposed.value.id() != value.certificate.value_id
1346                        {
1347                            sync.send(SyncMsg::InvalidValue(value.peer, certificate_height));
1348                        }
1349
1350                        Msg::<Ctx>::ReceivedProposedValue(proposed, ValueOrigin::Sync)
1351                    },
1352                    None,
1353                )?;
1354
1355                Ok(r.resume_with(()))
1356            }
1357
1358            Effect::WalAppend(height, entry, r) => {
1359                self.wal_append(height, entry, state.phase).await?;
1360                Ok(r.resume_with(()))
1361            }
1362        }
1363    }
1364}
1365
1366#[async_trait]
1367impl<Ctx> Actor for Consensus<Ctx>
1368where
1369    Ctx: Context,
1370{
1371    type Msg = Msg<Ctx>;
1372    type State = State<Ctx>;
1373    type Arguments = ();
1374
1375    #[tracing::instrument(
1376        name = "consensus",
1377        parent = &self.span,
1378        skip_all,
1379    )]
1380    async fn pre_start(
1381        &self,
1382        myself: ActorRef<Msg<Ctx>>,
1383        _args: (),
1384    ) -> Result<State<Ctx>, ActorProcessingErr> {
1385        info!("Consensus is starting");
1386
1387        self.network
1388            .cast(NetworkMsg::Subscribe(Box::new(myself.clone())))?;
1389
1390        Ok(State {
1391            timers: Timers::new(Box::new(myself)),
1392            timeouts: Ctx::Timeouts::default(),
1393            consensus: None,
1394            connected_peers: BTreeSet::new(),
1395            phase: Phase::Unstarted,
1396            msg_buffer: MessageBuffer::new(MAX_BUFFER_SIZE),
1397        })
1398    }
1399
1400    #[tracing::instrument(
1401        name = "consensus",
1402        parent = &self.span,
1403        skip_all,
1404        fields(height = %state.height(), round = %state.round())
1405    )]
1406    async fn post_start(
1407        &self,
1408        _myself: ActorRef<Msg<Ctx>>,
1409        state: &mut State<Ctx>,
1410    ) -> Result<(), ActorProcessingErr> {
1411        info!("Consensus has started");
1412
1413        state.timers.cancel_all();
1414        Ok(())
1415    }
1416
1417    #[tracing::instrument(
1418        name = "consensus",
1419        parent = &self.span,
1420        skip_all,
1421        fields(
1422            height = %span_height(state.height(), &msg),
1423            round = %span_round(state.round(), &msg)
1424        )
1425    )]
1426    async fn handle(
1427        &self,
1428        myself: ActorRef<Msg<Ctx>>,
1429        msg: Msg<Ctx>,
1430        state: &mut State<Ctx>,
1431    ) -> Result<(), ActorProcessingErr> {
1432        if state.phase != Phase::Running && should_buffer(&msg) {
1433            let _span = error_span!("buffer", phase = ?state.phase).entered();
1434            state.msg_buffer.buffer(msg);
1435            return Ok(());
1436        }
1437
1438        if let Err(e) = self.handle_msg(myself.clone(), state, msg).await {
1439            error!("Error when handling message: {e:?}");
1440        }
1441
1442        Ok(())
1443    }
1444
1445    #[tracing::instrument(
1446        name = "consensus",
1447        parent = &self.span,
1448        skip_all,
1449        fields(
1450            height = %state.height(),
1451            round = %state.round()
1452        )
1453    )]
1454    async fn post_stop(
1455        &self,
1456        _myself: ActorRef<Self::Msg>,
1457        state: &mut State<Ctx>,
1458    ) -> Result<(), ActorProcessingErr> {
1459        info!("Consensus has stopped");
1460        state.timers.cancel_all();
1461        Ok(())
1462    }
1463}
1464
1465fn should_buffer<Ctx: Context>(msg: &Msg<Ctx>) -> bool {
1466    !matches!(
1467        msg,
1468        Msg::StartHeight(..)
1469            | Msg::NetworkEvent(NetworkEvent::Listening(..))
1470            | Msg::NetworkEvent(NetworkEvent::PeerConnected(..))
1471            | Msg::NetworkEvent(NetworkEvent::PeerDisconnected(..))
1472    )
1473}
1474
1475/// Use the height we are about to start instead of the consensus state height
1476/// for the tracing span of the Consensus actor when starting a new height.
1477fn span_height<Ctx: Context>(height: Ctx::Height, msg: &Msg<Ctx>) -> Ctx::Height {
1478    if let Msg::StartHeight(h, _) = msg {
1479        *h
1480    } else {
1481        height
1482    }
1483}
1484
1485/// Use round 0 instead of the consensus state round for the tracing span of
1486/// the Consensus actor when starting a new height.
1487fn span_round<Ctx: Context>(round: Round, msg: &Msg<Ctx>) -> Round {
1488    if let Msg::StartHeight(_, _) = msg {
1489        Round::new(0)
1490    } else {
1491        round
1492    }
1493}
1494
1495async fn hang_on_failure<A, E>(
1496    f: impl Future<Output = Result<A, E>>,
1497    on_error: impl FnOnce(E),
1498) -> A {
1499    match f.await {
1500        Ok(value) => value,
1501        Err(e) => {
1502            on_error(e);
1503            error!("Critical consensus failure, hanging to prevent safety violations. Manual intervention required!");
1504            hang().await
1505        }
1506    }
1507}
1508
1509/// Hangs the consensus actor indefinitely to prevent safety violations.
1510///
1511/// This is called when WAL operations fail and consensus cannot safely continue.
1512/// The node operator should investigate the WAL issue and restart the node
1513/// only after ensuring data integrity.
1514async fn hang() -> ! {
1515    pending::<()>().await;
1516    unreachable!()
1517}