casper_node/components/
consensus.rs

1//! The consensus component. Provides distributed consensus among the nodes in the network.
2
3#![warn(clippy::arithmetic_side_effects)]
4
5mod cl_context;
6mod config;
7mod consensus_protocol;
8mod era_supervisor;
9#[macro_use]
10pub mod highway_core;
11pub(crate) mod error;
12mod leader_sequence;
13mod metrics;
14pub mod protocols;
15#[cfg(test)]
16pub(crate) mod tests;
17mod traits;
18pub mod utils;
19mod validator_change;
20
21use std::{
22    borrow::Cow,
23    fmt::{self, Debug, Display, Formatter},
24    sync::Arc,
25    time::Duration,
26};
27
28use datasize::DataSize;
29use derive_more::From;
30use serde::{Deserialize, Serialize};
31use tracing::{info, trace};
32
33use casper_types::{BlockHash, BlockHeader, EraId, Timestamp};
34
35use crate::{
36    components::Component,
37    effect::{
38        announcements::{
39            ConsensusAnnouncement, FatalAnnouncement, MetaBlockAnnouncement,
40            PeerBehaviorAnnouncement,
41        },
42        diagnostics_port::DumpConsensusStateRequest,
43        incoming::{ConsensusDemand, ConsensusMessageIncoming},
44        requests::{
45            BlockValidationRequest, ChainspecRawBytesRequest, ConsensusRequest,
46            ContractRuntimeRequest, NetworkInfoRequest, NetworkRequest, StorageRequest,
47            TransactionBufferRequest,
48        },
49        EffectBuilder, EffectExt, Effects,
50    },
51    failpoints::FailpointActivation,
52    protocol::Message,
53    reactor::ReactorEvent,
54    types::{BlockPayload, InvalidProposalError, NodeId},
55    NodeRng,
56};
57use protocols::{highway::HighwayProtocol, zug::Zug};
58use traits::Context;
59
60pub use cl_context::ClContext;
61pub(crate) use config::{ChainspecConsensusExt, Config};
62pub(crate) use consensus_protocol::{BlockContext, ProposedBlock};
63pub(crate) use era_supervisor::{debug::EraDump, EraSupervisor, SerializedMessage};
64#[cfg(test)]
65pub(crate) use highway_core::highway::Vertex as HighwayVertex;
66pub(crate) use leader_sequence::LeaderSequence;
67pub(crate) use protocols::highway::max_rounds_per_era;
68#[cfg(test)]
69pub(crate) use protocols::highway::HighwayMessage;
70
71const COMPONENT_NAME: &str = "consensus";
72
73#[allow(clippy::arithmetic_side_effects)]
74mod relaxed {
75    // This module exists solely to exempt the `EnumDiscriminants` macro generated code from the
76    // module-wide `clippy::arithmetic_side_effects` lint.
77
78    use casper_types::{EraId, PublicKey};
79    use datasize::DataSize;
80    use serde::{Deserialize, Serialize};
81    use strum::EnumDiscriminants;
82
83    use super::era_supervisor::SerializedMessage;
84
85    #[derive(DataSize, Clone, Serialize, Deserialize, EnumDiscriminants)]
86    #[strum_discriminants(derive(strum::EnumIter))]
87    pub(crate) enum ConsensusMessage {
88        /// A protocol message, to be handled by the instance in the specified era.
89        Protocol {
90            era_id: EraId,
91            payload: SerializedMessage,
92        },
93        /// A request for evidence against the specified validator, from any era that is still
94        /// bonded in `era_id`.
95        EvidenceRequest { era_id: EraId, pub_key: PublicKey },
96    }
97}
98pub(crate) use relaxed::{ConsensusMessage, ConsensusMessageDiscriminants};
99
100/// A request to be handled by the consensus protocol instance in a particular era.
101#[derive(DataSize, Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash, From)]
102pub(crate) enum EraRequest<C>
103where
104    C: Context,
105{
106    Zug(protocols::zug::SyncRequest<C>),
107}
108
109/// A protocol request message, to be handled by the instance in the specified era.
110#[derive(DataSize, Clone, Serialize, Deserialize)]
111pub(crate) struct ConsensusRequestMessage {
112    era_id: EraId,
113    payload: SerializedMessage,
114}
115
116/// An ID to distinguish different timers. What they are used for is specific to each consensus
117/// protocol implementation.
118#[derive(DataSize, Clone, Copy, Debug, Eq, PartialEq, Hash)]
119pub struct TimerId(pub u8);
120
121/// An ID to distinguish queued actions. What they are used for is specific to each consensus
122/// protocol implementation.
123#[derive(DataSize, Clone, Copy, Debug, Eq, PartialEq, Hash)]
124pub struct ActionId(pub u8);
125
126/// Payload for a block to be proposed.
127#[derive(DataSize, Debug, From)]
128pub struct NewBlockPayload {
129    pub(crate) era_id: EraId,
130    pub(crate) block_payload: Arc<BlockPayload>,
131    pub(crate) block_context: BlockContext<ClContext>,
132}
133
134/// The result of validation of a ProposedBlock.
135#[derive(DataSize, Debug, From)]
136pub struct ResolveValidity {
137    era_id: EraId,
138    sender: NodeId,
139    proposed_block: ProposedBlock<ClContext>,
140    maybe_error: Option<Box<InvalidProposalError>>,
141}
142
143/// Consensus component event.
144#[derive(DataSize, Debug, From)]
145pub(crate) enum Event {
146    /// An incoming network message.
147    #[from]
148    Incoming(ConsensusMessageIncoming),
149    /// A variant used with failpoints - when a message arrives, we fire this event with a delay,
150    /// and it also causes the message to be handled.
151    DelayedIncoming(ConsensusMessageIncoming),
152    /// An incoming demand message.
153    #[from]
154    DemandIncoming(ConsensusDemand),
155    /// A scheduled event to be handled by a specified era.
156    Timer {
157        era_id: EraId,
158        timestamp: Timestamp,
159        timer_id: TimerId,
160    },
161    /// A queued action to be handled by a specific era.
162    Action { era_id: EraId, action_id: ActionId },
163    /// We are receiving the data we require to propose a new block.
164    NewBlockPayload(NewBlockPayload),
165    #[from]
166    ConsensusRequest(ConsensusRequest),
167    /// A new block has been added to the linear chain.
168    BlockAdded {
169        header: Box<BlockHeader>,
170        header_hash: BlockHash,
171    },
172    /// The proposed block has been validated.
173    ResolveValidity(ResolveValidity),
174    /// Deactivate the era with the given ID, unless the number of faulty validators increases.
175    DeactivateEra {
176        era_id: EraId,
177        faulty_num: usize,
178        delay: Duration,
179    },
180    /// Dump state for debugging purposes.
181    #[from]
182    DumpState(DumpConsensusStateRequest),
183}
184
185impl Debug for ConsensusMessage {
186    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
187        match self {
188            ConsensusMessage::Protocol { era_id, payload: _ } => {
189                write!(f, "Protocol {{ era_id: {:?}, .. }}", era_id)
190            }
191            ConsensusMessage::EvidenceRequest { era_id, pub_key } => f
192                .debug_struct("EvidenceRequest")
193                .field("era_id", era_id)
194                .field("pub_key", pub_key)
195                .finish(),
196        }
197    }
198}
199
200impl Display for ConsensusMessage {
201    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
202        match self {
203            ConsensusMessage::Protocol { era_id, payload } => {
204                write!(
205                    f,
206                    "protocol message ({} bytes) in {}",
207                    payload.as_raw().len(),
208                    era_id
209                )
210            }
211            ConsensusMessage::EvidenceRequest { era_id, pub_key } => write!(
212                f,
213                "request for evidence of fault by {} in {} or earlier",
214                pub_key, era_id,
215            ),
216        }
217    }
218}
219
220impl Debug for ConsensusRequestMessage {
221    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
222        write!(
223            f,
224            "ConsensusRequestMessage {{ era_id: {:?}, .. }}",
225            self.era_id
226        )
227    }
228}
229
230impl Display for ConsensusRequestMessage {
231    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
232        write!(f, "protocol request {:?} in {}", self.payload, self.era_id)
233    }
234}
235
236impl Display for Event {
237    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
238        match self {
239            Event::Incoming(ConsensusMessageIncoming { sender, message }) => {
240                write!(f, "message from {:?}: {}", sender, message)
241            }
242            Event::DelayedIncoming(ConsensusMessageIncoming { sender, message }) => {
243                write!(f, "delayed message from {:?}: {}", sender, message)
244            }
245            Event::DemandIncoming(demand) => {
246                write!(f, "demand from {:?}: {}", demand.sender, demand.request_msg)
247            }
248            Event::Timer {
249                era_id,
250                timestamp,
251                timer_id,
252            } => write!(
253                f,
254                "timer (ID {}) for {} scheduled for timestamp {}",
255                timer_id.0, era_id, timestamp,
256            ),
257            Event::Action { era_id, action_id } => {
258                write!(f, "action (ID {}) for {}", action_id.0, era_id)
259            }
260            Event::NewBlockPayload(NewBlockPayload {
261                era_id,
262                block_payload,
263                block_context,
264            }) => write!(
265                f,
266                "New proposed block for era {:?}: {:?}, {:?}",
267                era_id, block_payload, block_context
268            ),
269            Event::ConsensusRequest(request) => write!(
270                f,
271                "A request for consensus component hash been received: {:?}",
272                request
273            ),
274            Event::BlockAdded {
275                header: _,
276                header_hash,
277            } => write!(
278                f,
279                "A block has been added to the linear chain: {}",
280                header_hash,
281            ),
282            Event::ResolveValidity(ResolveValidity {
283                era_id,
284                sender,
285                proposed_block,
286                maybe_error,
287            }) => write!(
288                f,
289                "Proposed block received from {:?} for {} is {}: {:?}",
290                sender,
291                era_id,
292                if maybe_error.is_none() {
293                    "valid".to_string()
294                } else {
295                    format!("invalid ({:?})", maybe_error).to_string()
296                },
297                proposed_block,
298            ),
299            Event::DeactivateEra {
300                era_id, faulty_num, ..
301            } => write!(
302                f,
303                "Deactivate old {} unless additional faults are observed; faults so far: {}",
304                era_id, faulty_num
305            ),
306            Event::DumpState(req) => Display::fmt(req, f),
307        }
308    }
309}
310
311/// A helper trait whose bounds represent the requirements for a reactor event that `EraSupervisor`
312/// can work with.
313pub(crate) trait ReactorEventT:
314    ReactorEvent
315    + From<Event>
316    + Send
317    + From<NetworkRequest<Message>>
318    + From<ConsensusDemand>
319    + From<NetworkInfoRequest>
320    + From<TransactionBufferRequest>
321    + From<ConsensusAnnouncement>
322    + From<BlockValidationRequest>
323    + From<StorageRequest>
324    + From<ContractRuntimeRequest>
325    + From<ChainspecRawBytesRequest>
326    + From<PeerBehaviorAnnouncement>
327    + From<MetaBlockAnnouncement>
328    + From<FatalAnnouncement>
329{
330}
331
332impl<REv> ReactorEventT for REv where
333    REv: ReactorEvent
334        + From<Event>
335        + Send
336        + From<ConsensusDemand>
337        + From<NetworkRequest<Message>>
338        + From<NetworkInfoRequest>
339        + From<TransactionBufferRequest>
340        + From<ConsensusAnnouncement>
341        + From<BlockValidationRequest>
342        + From<StorageRequest>
343        + From<ContractRuntimeRequest>
344        + From<ChainspecRawBytesRequest>
345        + From<PeerBehaviorAnnouncement>
346        + From<MetaBlockAnnouncement>
347        + From<FatalAnnouncement>
348{
349}
350
351mod specimen_support {
352    use crate::utils::specimen::{largest_variant, Cache, LargestSpecimen, SizeEstimator};
353
354    use super::{
355        protocols::{highway, zug},
356        ClContext, ConsensusMessage, ConsensusMessageDiscriminants, ConsensusRequestMessage,
357        EraRequest, SerializedMessage,
358    };
359
360    impl LargestSpecimen for ConsensusMessage {
361        fn largest_specimen<E: SizeEstimator>(estimator: &E, cache: &mut Cache) -> Self {
362            largest_variant::<Self, ConsensusMessageDiscriminants, _, _>(estimator, |variant| {
363                match variant {
364                    ConsensusMessageDiscriminants::Protocol => {
365                        let zug_payload = SerializedMessage::from_message(
366                            &zug::Message::<ClContext>::largest_specimen(estimator, cache),
367                        );
368                        let highway_payload = SerializedMessage::from_message(
369                            &highway::HighwayMessage::<ClContext>::largest_specimen(
370                                estimator, cache,
371                            ),
372                        );
373
374                        let payload = if zug_payload.as_raw().len() > highway_payload.as_raw().len()
375                        {
376                            zug_payload
377                        } else {
378                            highway_payload
379                        };
380
381                        ConsensusMessage::Protocol {
382                            era_id: LargestSpecimen::largest_specimen(estimator, cache),
383                            payload,
384                        }
385                    }
386                    ConsensusMessageDiscriminants::EvidenceRequest => {
387                        ConsensusMessage::EvidenceRequest {
388                            era_id: LargestSpecimen::largest_specimen(estimator, cache),
389                            pub_key: LargestSpecimen::largest_specimen(estimator, cache),
390                        }
391                    }
392                }
393            })
394        }
395    }
396
397    impl LargestSpecimen for ConsensusRequestMessage {
398        fn largest_specimen<E: SizeEstimator>(estimator: &E, cache: &mut Cache) -> Self {
399            let zug_sync_request = SerializedMessage::from_message(
400                &zug::SyncRequest::<ClContext>::largest_specimen(estimator, cache),
401            );
402
403            ConsensusRequestMessage {
404                era_id: LargestSpecimen::largest_specimen(estimator, cache),
405                payload: zug_sync_request,
406            }
407        }
408    }
409
410    impl LargestSpecimen for EraRequest<ClContext> {
411        fn largest_specimen<E: SizeEstimator>(estimator: &E, cache: &mut Cache) -> Self {
412            EraRequest::Zug(LargestSpecimen::largest_specimen(estimator, cache))
413        }
414    }
415}
416
417impl<REv> Component<REv> for EraSupervisor
418where
419    REv: ReactorEventT,
420{
421    type Event = Event;
422
423    fn name(&self) -> &str {
424        COMPONENT_NAME
425    }
426
427    fn activate_failpoint(&mut self, activation: &FailpointActivation) {
428        self.message_delay_failpoint.update_from(activation);
429        self.proposal_delay_failpoint.update_from(activation);
430    }
431
432    fn handle_event(
433        &mut self,
434        effect_builder: EffectBuilder<REv>,
435        rng: &mut NodeRng,
436        event: Self::Event,
437    ) -> Effects<Self::Event> {
438        trace!("{:?}", event);
439        match event {
440            Event::Timer {
441                era_id,
442                timestamp,
443                timer_id,
444            } => self.handle_timer(effect_builder, rng, era_id, timestamp, timer_id),
445            Event::Action { era_id, action_id } => {
446                self.handle_action(effect_builder, rng, era_id, action_id)
447            }
448            Event::Incoming(ConsensusMessageIncoming { sender, message }) => {
449                let delay_by = self.message_delay_failpoint.fire(rng).cloned();
450                if let Some(delay) = delay_by {
451                    effect_builder
452                        .set_timeout(Duration::from_millis(delay))
453                        .event(move |_| {
454                            Event::DelayedIncoming(ConsensusMessageIncoming { sender, message })
455                        })
456                } else {
457                    self.handle_message(effect_builder, rng, sender, *message)
458                }
459            }
460            Event::DelayedIncoming(ConsensusMessageIncoming { sender, message }) => {
461                self.handle_message(effect_builder, rng, sender, *message)
462            }
463            Event::DemandIncoming(ConsensusDemand {
464                sender,
465                request_msg: demand,
466                auto_closing_responder,
467            }) => self.handle_demand(effect_builder, rng, sender, demand, auto_closing_responder),
468            Event::NewBlockPayload(new_block_payload) => {
469                self.handle_new_block_payload(effect_builder, rng, new_block_payload)
470            }
471            Event::BlockAdded {
472                header,
473                header_hash: _,
474            } => self.handle_block_added(effect_builder, rng, *header),
475            Event::ResolveValidity(resolve_validity) => {
476                self.resolve_validity(effect_builder, rng, resolve_validity)
477            }
478            Event::DeactivateEra {
479                era_id,
480                faulty_num,
481                delay,
482            } => self.handle_deactivate_era(effect_builder, era_id, faulty_num, delay),
483            Event::ConsensusRequest(ConsensusRequest::Status(responder)) => self.status(responder),
484            Event::ConsensusRequest(ConsensusRequest::ValidatorChanges(responder)) => {
485                let validator_changes = self.get_validator_changes();
486                responder.respond(validator_changes).ignore()
487            }
488            Event::DumpState(req @ DumpConsensusStateRequest { era_id, .. }) => {
489                let current_era = match self.current_era() {
490                    None => {
491                        return req
492                            .answer(Err(Cow::Owned("consensus not initialized".to_string())))
493                            .ignore()
494                    }
495                    Some(era_id) => era_id,
496                };
497
498                let requested_era = era_id.unwrap_or(current_era);
499
500                // We emit some log message to get some performance information and give the
501                // operator a chance to find out why their node is busy.
502                info!(era_id=%requested_era.value(), was_latest=era_id.is_none(), "dumping era via diagnostics port");
503
504                let era_dump_result = self
505                    .open_eras()
506                    .get(&requested_era)
507                    .ok_or_else(|| {
508                        Cow::Owned(format!(
509                            "could not dump consensus, {} not found",
510                            requested_era
511                        ))
512                    })
513                    .and_then(|era| EraDump::dump_era(era, requested_era));
514
515                match era_dump_result {
516                    Ok(dump) => req.answer(Ok(&dump)).ignore(),
517                    Err(err) => req.answer(Err(err)).ignore(),
518                }
519            }
520        }
521    }
522}