1#![warn(clippy::arithmetic_side_effects)]
4
5mod cl_context;
6mod config;
7mod consensus_protocol;
8mod era_supervisor;
9#[macro_use]
10pub mod highway_core;
11pub(crate) mod error;
12mod leader_sequence;
13mod metrics;
14pub mod protocols;
15#[cfg(test)]
16pub(crate) mod tests;
17mod traits;
18pub mod utils;
19mod validator_change;
20
21use std::{
22 borrow::Cow,
23 fmt::{self, Debug, Display, Formatter},
24 sync::Arc,
25 time::Duration,
26};
27
28use datasize::DataSize;
29use derive_more::From;
30use serde::{Deserialize, Serialize};
31use tracing::{info, trace};
32
33use casper_types::{BlockHash, BlockHeader, EraId, Timestamp};
34
35use crate::{
36 components::Component,
37 effect::{
38 announcements::{
39 ConsensusAnnouncement, FatalAnnouncement, MetaBlockAnnouncement,
40 PeerBehaviorAnnouncement,
41 },
42 diagnostics_port::DumpConsensusStateRequest,
43 incoming::{ConsensusDemand, ConsensusMessageIncoming},
44 requests::{
45 BlockValidationRequest, ChainspecRawBytesRequest, ConsensusRequest,
46 ContractRuntimeRequest, NetworkInfoRequest, NetworkRequest, StorageRequest,
47 TransactionBufferRequest,
48 },
49 EffectBuilder, EffectExt, Effects,
50 },
51 failpoints::FailpointActivation,
52 protocol::Message,
53 reactor::ReactorEvent,
54 types::{BlockPayload, InvalidProposalError, NodeId},
55 NodeRng,
56};
57use protocols::{highway::HighwayProtocol, zug::Zug};
58use traits::Context;
59
60pub use cl_context::ClContext;
61pub(crate) use config::{ChainspecConsensusExt, Config};
62pub(crate) use consensus_protocol::{BlockContext, ProposedBlock};
63pub(crate) use era_supervisor::{debug::EraDump, EraSupervisor, SerializedMessage};
64#[cfg(test)]
65pub(crate) use highway_core::highway::Vertex as HighwayVertex;
66pub(crate) use leader_sequence::LeaderSequence;
67pub(crate) use protocols::highway::max_rounds_per_era;
68#[cfg(test)]
69pub(crate) use protocols::highway::HighwayMessage;
70
71const COMPONENT_NAME: &str = "consensus";
72
73#[allow(clippy::arithmetic_side_effects)]
74mod relaxed {
75 use casper_types::{EraId, PublicKey};
79 use datasize::DataSize;
80 use serde::{Deserialize, Serialize};
81 use strum::EnumDiscriminants;
82
83 use super::era_supervisor::SerializedMessage;
84
85 #[derive(DataSize, Clone, Serialize, Deserialize, EnumDiscriminants)]
86 #[strum_discriminants(derive(strum::EnumIter))]
87 pub(crate) enum ConsensusMessage {
88 Protocol {
90 era_id: EraId,
91 payload: SerializedMessage,
92 },
93 EvidenceRequest { era_id: EraId, pub_key: PublicKey },
96 }
97}
98pub(crate) use relaxed::{ConsensusMessage, ConsensusMessageDiscriminants};
99
100#[derive(DataSize, Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash, From)]
102pub(crate) enum EraRequest<C>
103where
104 C: Context,
105{
106 Zug(protocols::zug::SyncRequest<C>),
107}
108
109#[derive(DataSize, Clone, Serialize, Deserialize)]
111pub(crate) struct ConsensusRequestMessage {
112 era_id: EraId,
113 payload: SerializedMessage,
114}
115
116#[derive(DataSize, Clone, Copy, Debug, Eq, PartialEq, Hash)]
119pub struct TimerId(pub u8);
120
121#[derive(DataSize, Clone, Copy, Debug, Eq, PartialEq, Hash)]
124pub struct ActionId(pub u8);
125
126#[derive(DataSize, Debug, From)]
128pub struct NewBlockPayload {
129 pub(crate) era_id: EraId,
130 pub(crate) block_payload: Arc<BlockPayload>,
131 pub(crate) block_context: BlockContext<ClContext>,
132}
133
134#[derive(DataSize, Debug, From)]
136pub struct ResolveValidity {
137 era_id: EraId,
138 sender: NodeId,
139 proposed_block: ProposedBlock<ClContext>,
140 maybe_error: Option<Box<InvalidProposalError>>,
141}
142
143#[derive(DataSize, Debug, From)]
145pub(crate) enum Event {
146 #[from]
148 Incoming(ConsensusMessageIncoming),
149 DelayedIncoming(ConsensusMessageIncoming),
152 #[from]
154 DemandIncoming(ConsensusDemand),
155 Timer {
157 era_id: EraId,
158 timestamp: Timestamp,
159 timer_id: TimerId,
160 },
161 Action { era_id: EraId, action_id: ActionId },
163 NewBlockPayload(NewBlockPayload),
165 #[from]
166 ConsensusRequest(ConsensusRequest),
167 BlockAdded {
169 header: Box<BlockHeader>,
170 header_hash: BlockHash,
171 },
172 ResolveValidity(ResolveValidity),
174 DeactivateEra {
176 era_id: EraId,
177 faulty_num: usize,
178 delay: Duration,
179 },
180 #[from]
182 DumpState(DumpConsensusStateRequest),
183}
184
185impl Debug for ConsensusMessage {
186 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
187 match self {
188 ConsensusMessage::Protocol { era_id, payload: _ } => {
189 write!(f, "Protocol {{ era_id: {:?}, .. }}", era_id)
190 }
191 ConsensusMessage::EvidenceRequest { era_id, pub_key } => f
192 .debug_struct("EvidenceRequest")
193 .field("era_id", era_id)
194 .field("pub_key", pub_key)
195 .finish(),
196 }
197 }
198}
199
200impl Display for ConsensusMessage {
201 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
202 match self {
203 ConsensusMessage::Protocol { era_id, payload } => {
204 write!(
205 f,
206 "protocol message ({} bytes) in {}",
207 payload.as_raw().len(),
208 era_id
209 )
210 }
211 ConsensusMessage::EvidenceRequest { era_id, pub_key } => write!(
212 f,
213 "request for evidence of fault by {} in {} or earlier",
214 pub_key, era_id,
215 ),
216 }
217 }
218}
219
220impl Debug for ConsensusRequestMessage {
221 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
222 write!(
223 f,
224 "ConsensusRequestMessage {{ era_id: {:?}, .. }}",
225 self.era_id
226 )
227 }
228}
229
230impl Display for ConsensusRequestMessage {
231 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
232 write!(f, "protocol request {:?} in {}", self.payload, self.era_id)
233 }
234}
235
236impl Display for Event {
237 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
238 match self {
239 Event::Incoming(ConsensusMessageIncoming { sender, message }) => {
240 write!(f, "message from {:?}: {}", sender, message)
241 }
242 Event::DelayedIncoming(ConsensusMessageIncoming { sender, message }) => {
243 write!(f, "delayed message from {:?}: {}", sender, message)
244 }
245 Event::DemandIncoming(demand) => {
246 write!(f, "demand from {:?}: {}", demand.sender, demand.request_msg)
247 }
248 Event::Timer {
249 era_id,
250 timestamp,
251 timer_id,
252 } => write!(
253 f,
254 "timer (ID {}) for {} scheduled for timestamp {}",
255 timer_id.0, era_id, timestamp,
256 ),
257 Event::Action { era_id, action_id } => {
258 write!(f, "action (ID {}) for {}", action_id.0, era_id)
259 }
260 Event::NewBlockPayload(NewBlockPayload {
261 era_id,
262 block_payload,
263 block_context,
264 }) => write!(
265 f,
266 "New proposed block for era {:?}: {:?}, {:?}",
267 era_id, block_payload, block_context
268 ),
269 Event::ConsensusRequest(request) => write!(
270 f,
271 "A request for consensus component hash been received: {:?}",
272 request
273 ),
274 Event::BlockAdded {
275 header: _,
276 header_hash,
277 } => write!(
278 f,
279 "A block has been added to the linear chain: {}",
280 header_hash,
281 ),
282 Event::ResolveValidity(ResolveValidity {
283 era_id,
284 sender,
285 proposed_block,
286 maybe_error,
287 }) => write!(
288 f,
289 "Proposed block received from {:?} for {} is {}: {:?}",
290 sender,
291 era_id,
292 if maybe_error.is_none() {
293 "valid".to_string()
294 } else {
295 format!("invalid ({:?})", maybe_error).to_string()
296 },
297 proposed_block,
298 ),
299 Event::DeactivateEra {
300 era_id, faulty_num, ..
301 } => write!(
302 f,
303 "Deactivate old {} unless additional faults are observed; faults so far: {}",
304 era_id, faulty_num
305 ),
306 Event::DumpState(req) => Display::fmt(req, f),
307 }
308 }
309}
310
311pub(crate) trait ReactorEventT:
314 ReactorEvent
315 + From<Event>
316 + Send
317 + From<NetworkRequest<Message>>
318 + From<ConsensusDemand>
319 + From<NetworkInfoRequest>
320 + From<TransactionBufferRequest>
321 + From<ConsensusAnnouncement>
322 + From<BlockValidationRequest>
323 + From<StorageRequest>
324 + From<ContractRuntimeRequest>
325 + From<ChainspecRawBytesRequest>
326 + From<PeerBehaviorAnnouncement>
327 + From<MetaBlockAnnouncement>
328 + From<FatalAnnouncement>
329{
330}
331
332impl<REv> ReactorEventT for REv where
333 REv: ReactorEvent
334 + From<Event>
335 + Send
336 + From<ConsensusDemand>
337 + From<NetworkRequest<Message>>
338 + From<NetworkInfoRequest>
339 + From<TransactionBufferRequest>
340 + From<ConsensusAnnouncement>
341 + From<BlockValidationRequest>
342 + From<StorageRequest>
343 + From<ContractRuntimeRequest>
344 + From<ChainspecRawBytesRequest>
345 + From<PeerBehaviorAnnouncement>
346 + From<MetaBlockAnnouncement>
347 + From<FatalAnnouncement>
348{
349}
350
351mod specimen_support {
352 use crate::utils::specimen::{largest_variant, Cache, LargestSpecimen, SizeEstimator};
353
354 use super::{
355 protocols::{highway, zug},
356 ClContext, ConsensusMessage, ConsensusMessageDiscriminants, ConsensusRequestMessage,
357 EraRequest, SerializedMessage,
358 };
359
360 impl LargestSpecimen for ConsensusMessage {
361 fn largest_specimen<E: SizeEstimator>(estimator: &E, cache: &mut Cache) -> Self {
362 largest_variant::<Self, ConsensusMessageDiscriminants, _, _>(estimator, |variant| {
363 match variant {
364 ConsensusMessageDiscriminants::Protocol => {
365 let zug_payload = SerializedMessage::from_message(
366 &zug::Message::<ClContext>::largest_specimen(estimator, cache),
367 );
368 let highway_payload = SerializedMessage::from_message(
369 &highway::HighwayMessage::<ClContext>::largest_specimen(
370 estimator, cache,
371 ),
372 );
373
374 let payload = if zug_payload.as_raw().len() > highway_payload.as_raw().len()
375 {
376 zug_payload
377 } else {
378 highway_payload
379 };
380
381 ConsensusMessage::Protocol {
382 era_id: LargestSpecimen::largest_specimen(estimator, cache),
383 payload,
384 }
385 }
386 ConsensusMessageDiscriminants::EvidenceRequest => {
387 ConsensusMessage::EvidenceRequest {
388 era_id: LargestSpecimen::largest_specimen(estimator, cache),
389 pub_key: LargestSpecimen::largest_specimen(estimator, cache),
390 }
391 }
392 }
393 })
394 }
395 }
396
397 impl LargestSpecimen for ConsensusRequestMessage {
398 fn largest_specimen<E: SizeEstimator>(estimator: &E, cache: &mut Cache) -> Self {
399 let zug_sync_request = SerializedMessage::from_message(
400 &zug::SyncRequest::<ClContext>::largest_specimen(estimator, cache),
401 );
402
403 ConsensusRequestMessage {
404 era_id: LargestSpecimen::largest_specimen(estimator, cache),
405 payload: zug_sync_request,
406 }
407 }
408 }
409
410 impl LargestSpecimen for EraRequest<ClContext> {
411 fn largest_specimen<E: SizeEstimator>(estimator: &E, cache: &mut Cache) -> Self {
412 EraRequest::Zug(LargestSpecimen::largest_specimen(estimator, cache))
413 }
414 }
415}
416
417impl<REv> Component<REv> for EraSupervisor
418where
419 REv: ReactorEventT,
420{
421 type Event = Event;
422
423 fn name(&self) -> &str {
424 COMPONENT_NAME
425 }
426
427 fn activate_failpoint(&mut self, activation: &FailpointActivation) {
428 self.message_delay_failpoint.update_from(activation);
429 self.proposal_delay_failpoint.update_from(activation);
430 }
431
432 fn handle_event(
433 &mut self,
434 effect_builder: EffectBuilder<REv>,
435 rng: &mut NodeRng,
436 event: Self::Event,
437 ) -> Effects<Self::Event> {
438 trace!("{:?}", event);
439 match event {
440 Event::Timer {
441 era_id,
442 timestamp,
443 timer_id,
444 } => self.handle_timer(effect_builder, rng, era_id, timestamp, timer_id),
445 Event::Action { era_id, action_id } => {
446 self.handle_action(effect_builder, rng, era_id, action_id)
447 }
448 Event::Incoming(ConsensusMessageIncoming { sender, message }) => {
449 let delay_by = self.message_delay_failpoint.fire(rng).cloned();
450 if let Some(delay) = delay_by {
451 effect_builder
452 .set_timeout(Duration::from_millis(delay))
453 .event(move |_| {
454 Event::DelayedIncoming(ConsensusMessageIncoming { sender, message })
455 })
456 } else {
457 self.handle_message(effect_builder, rng, sender, *message)
458 }
459 }
460 Event::DelayedIncoming(ConsensusMessageIncoming { sender, message }) => {
461 self.handle_message(effect_builder, rng, sender, *message)
462 }
463 Event::DemandIncoming(ConsensusDemand {
464 sender,
465 request_msg: demand,
466 auto_closing_responder,
467 }) => self.handle_demand(effect_builder, rng, sender, demand, auto_closing_responder),
468 Event::NewBlockPayload(new_block_payload) => {
469 self.handle_new_block_payload(effect_builder, rng, new_block_payload)
470 }
471 Event::BlockAdded {
472 header,
473 header_hash: _,
474 } => self.handle_block_added(effect_builder, rng, *header),
475 Event::ResolveValidity(resolve_validity) => {
476 self.resolve_validity(effect_builder, rng, resolve_validity)
477 }
478 Event::DeactivateEra {
479 era_id,
480 faulty_num,
481 delay,
482 } => self.handle_deactivate_era(effect_builder, era_id, faulty_num, delay),
483 Event::ConsensusRequest(ConsensusRequest::Status(responder)) => self.status(responder),
484 Event::ConsensusRequest(ConsensusRequest::ValidatorChanges(responder)) => {
485 let validator_changes = self.get_validator_changes();
486 responder.respond(validator_changes).ignore()
487 }
488 Event::DumpState(req @ DumpConsensusStateRequest { era_id, .. }) => {
489 let current_era = match self.current_era() {
490 None => {
491 return req
492 .answer(Err(Cow::Owned("consensus not initialized".to_string())))
493 .ignore()
494 }
495 Some(era_id) => era_id,
496 };
497
498 let requested_era = era_id.unwrap_or(current_era);
499
500 info!(era_id=%requested_era.value(), was_latest=era_id.is_none(), "dumping era via diagnostics port");
503
504 let era_dump_result = self
505 .open_eras()
506 .get(&requested_era)
507 .ok_or_else(|| {
508 Cow::Owned(format!(
509 "could not dump consensus, {} not found",
510 requested_era
511 ))
512 })
513 .and_then(|era| EraDump::dump_era(era, requested_era));
514
515 match era_dump_result {
516 Ok(dump) => req.answer(Ok(&dump)).ignore(),
517 Err(err) => req.answer(Err(err)).ignore(),
518 }
519 }
520 }
521 }
522}