pathfinder_consensus/lib.rs
1//! # Pathfinder Consensus
2//!
3//! A Byzantine Fault Tolerant (BFT) consensus engine for Starknet nodes.
4//!
5//! ## Overview
6//!
7//! This crate provides a consensus engine for Starknet nodes that wraps the
8//! Malachite implementation of the Tendermint BFT consensus algorithm. It's
9//! designed to be generic over validator addresses and consensus values, making
10//! it suitable for Starknet's consensus requirements.
11//!
12//! ## Core Concepts
13//!
14//! ### ValidatorAddress Trait
15//!
16//! Your validator address type must implement the `ValidatorAddress` trait,
17//! which requires:
18//! - `Sync + Send`: Thread-safe and sendable across threads
19//! - `Ord + Display + Debug + Default + Clone`: Standard Rust traits for
20//! ordering, display, debugging, default values, and cloning
21//! - `Into<Vec<u8>>`: Convertible to bytes for serialization
22//! - `Serialize + DeserializeOwned`: Serde serialization support
23//!
24//! ### ValuePayload Trait
25//!
26//! Your consensus value type must implement the `ValuePayload` trait, which
27//! requires:
28//! - `Sync + Send`: Thread-safe and sendable across threads
29//! - `Ord + Display + Debug + Default + Clone`: Standard Rust traits
30//! - `Serialize + DeserializeOwned`: Serde serialization support
31//!
32//! ### Consensus Engine
33//!
34//! The main `Consensus<V, A>` struct is generic over:
35//! - `V`: Your consensus value type (must implement `ValuePayload`)
36//! - `A`: Your validator address type (must implement `ValidatorAddress`)
37//!
38//! ## Usage Example
39//!
40//! ```rust
41//! use pathfinder_consensus::*;
42//! use serde::{Deserialize, Serialize};
43//!
44//! // Define your validator address type
45//! #[derive(Clone, Debug, Default, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
46//! struct MyAddress(String);
47//!
48//! impl std::fmt::Display for MyAddress {
49//! fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
50//! write!(f, "{}", self.0)
51//! }
52//! }
53//!
54//! impl From<MyAddress> for Vec<u8> {
55//! fn from(addr: MyAddress) -> Self {
56//! addr.0.into_bytes()
57//! }
58//! }
59//!
60//! // Define your consensus value type
61//! #[derive(Clone, Debug, Default, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
62//! struct BlockData(String);
63//!
64//! impl std::fmt::Display for BlockData {
65//! fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
66//! write!(f, "{}", self.0)
67//! }
68//! }
69//!
70//! #[tokio::main]
71//! async fn main() {
72//! // Create configuration
73//! let my_address = MyAddress("validator_1".to_string());
74//! let config = Config::new(my_address.clone());
75//!
76//! // Create consensus engine
77//! let mut consensus = Consensus::new(config);
78//!
79//! // Start consensus at height 1
80//! let validator_set = ValidatorSet::new(vec![Validator::new(
81//! my_address.clone(),
82//! PublicKey::from_bytes([0; 32]),
83//! )]);
84//!
85//! consensus.handle_command(ConsensusCommand::StartHeight(1, validator_set));
86//!
87//! // Poll for events
88//! while let Some(event) = consensus.next_event().await {
89//! match event {
90//! ConsensusEvent::RequestProposal { height, round } => {
91//! println!("Need to propose at height {}, round {}", height, round);
92//! }
93//! ConsensusEvent::Decision { height, value } => {
94//! println!("Consensus reached at height {}: {:?}", height, value);
95//! }
96//! ConsensusEvent::Gossip(message) => {
97//! println!("Need to gossip: {:?}", message);
98//! }
99//! ConsensusEvent::Error(error) => {
100//! eprintln!("Consensus error: {}", error);
101//! }
102//! }
103//! }
104//! }
105//! ```
106//!
107//! ## Commands and Events
108//!
109//! The consensus engine operates on a command/event model:
110//!
111//! - **Commands**: Send commands to the consensus engine via `handle_command()`
112//! - **Events**: Poll for events from the consensus engine via
113//! `next_event().await`
114//!
115//! ## Crash Recovery
116//!
117//! The consensus engine supports crash recovery through write-ahead logging:
118//!
119//! ```rust
120//! // Recover from a previous crash
121//! let validator_sets = Arc::new(StaticValidatorSetProvider::new(validator_set));
122//! let mut consensus = Consensus::recover(config, validator_sets);
123//! ```
124
125use std::collections::{BTreeMap, HashMap, VecDeque};
126use std::fmt::{Debug, Display};
127use std::ops::{Add, Sub};
128use std::sync::Arc;
129
130use serde::de::DeserializeOwned;
131use serde::{Deserialize, Serialize};
132
133// Re-export consensus types needed by the public API
134pub use crate::config::{Config, TimeoutValues};
135use crate::internal::{InternalConsensus, InternalParams};
136use crate::wal::{FileWalSink, NoopWal, WalSink};
137
138mod config;
139mod internal;
140mod wal;
141
142/// A cryptographic signature for consensus messages.
143///
144/// This type is used to sign proposals and votes in the consensus protocol
145/// to ensure authenticity and integrity of consensus messages.
146pub type Signature = malachite_signing_ed25519::Signature;
147
148/// An Ed25519 signing key.
149///
150/// This is also called a secret key by other implementations.
151pub type SigningKey = ed25519_consensus::SigningKey;
152
153/// A trait for consensus validator addresses.
154///
155/// This trait defines the requirements for validator address types used in the
156/// consensus engine. Your validator address type must implement all the
157/// required traits to be compatible with the consensus engine.
158///
159/// ## Required Traits
160///
161/// - `Sync + Send`: Thread-safe and sendable across threads
162/// - `Ord + Display + Debug + Default + Clone`: Standard Rust traits for
163/// ordering, display, debugging, default values, and cloning
164/// - `Into<Vec<u8>>`: Convertible to bytes for serialization
165/// - `Serialize + DeserializeOwned`: Serde serialization support
166///
167/// ## Example Implementation
168///
169/// ```rust
170/// #[derive(Clone, Debug, Default, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
171/// struct MyAddress(String);
172///
173/// impl std::fmt::Display for MyAddress {
174/// fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
175/// write!(f, "{}", self.0)
176/// }
177/// }
178///
179/// impl From<MyAddress> for Vec<u8> {
180/// fn from(addr: MyAddress) -> Self {
181/// addr.0.into_bytes()
182/// }
183/// }
184/// ```
185pub trait ValidatorAddress:
186 Sync + Send + Ord + Display + Debug + Default + Clone + Into<Vec<u8>> + Serialize + DeserializeOwned
187{
188}
189impl<T> ValidatorAddress for T where
190 T: Sync
191 + Send
192 + Ord
193 + Display
194 + Debug
195 + Default
196 + Clone
197 + Into<Vec<u8>>
198 + Serialize
199 + DeserializeOwned
200{
201}
202
203/// A trait for consensus value payloads.
204///
205/// This trait defines the requirements for consensus value types used in the
206/// consensus engine. Your consensus value type must implement all the required
207/// traits to be compatible with the consensus engine.
208///
209/// ## Required Traits
210///
211/// - `Sync + Send`: Thread-safe and sendable across threads
212/// - `Ord + Display + Debug + Default + Clone`: Standard Rust traits
213/// - `Serialize + DeserializeOwned`: Serde serialization support
214///
215/// ## Example Implementation
216///
217/// ```rust
218/// #[derive(Clone, Debug, Default, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
219/// struct BlockData(String);
220///
221/// impl std::fmt::Display for BlockData {
222/// fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
223/// write!(f, "{}", self.0)
224/// }
225/// }
226/// ```
227pub trait ValuePayload:
228 Sync + Send + Ord + Display + Debug + Default + Clone + Serialize + DeserializeOwned
229{
230}
231impl<T> ValuePayload for T where
232 T: Sync + Send + Ord + Display + Debug + Default + Clone + Serialize + DeserializeOwned
233{
234}
235
236/// Pathfinder consensus engine.
237///
238/// This is the main consensus engine for Starknet nodes that implements
239/// Byzantine Fault Tolerant (BFT) consensus using the Malachite implementation
240/// of Tendermint. It's generic over validator addresses and consensus values,
241/// making it suitable for Starknet's consensus requirements.
242///
243/// ## Generic Parameters
244///
245/// - `V`: Your consensus value type (must implement `ValuePayload`)
246/// - `A`: Your validator address type (must implement `ValidatorAddress`)
247///
248/// ## Usage
249///
250/// ```rust
251/// let config = Config::new(my_address);
252/// let mut consensus = Consensus::new(config);
253///
254/// // Start consensus at a height
255/// consensus.handle_command(ConsensusCommand::StartHeight(height, validator_set));
256///
257/// // Poll for events
258/// while let Some(event) = consensus.next_event().await {
259/// // Handle events
260/// }
261/// ```
262///
263/// ## Crash Recovery
264///
265/// The consensus engine supports crash recovery through write-ahead logging:
266///
267/// ```rust
268/// let validator_sets = Arc::new(StaticValidatorSetProvider::new(validator_set));
269/// let mut consensus = Consensus::recover(config, validator_sets);
270/// ```
271pub struct Consensus<V: ValuePayload + 'static, A: ValidatorAddress + 'static> {
272 internal: HashMap<u64, InternalConsensus<V, A>>,
273 event_queue: VecDeque<ConsensusEvent<V, A>>,
274 config: Config<A>,
275 min_kept_height: Option<u64>,
276}
277
278impl<V: ValuePayload + 'static, A: ValidatorAddress + 'static> Consensus<V, A> {
279 /// Create a new consensus engine for the current validator.
280 ///
281 /// ## Arguments
282 ///
283 /// - `config`: The consensus configuration containing validator address,
284 /// timeouts, and other settings
285 ///
286 /// ## Example
287 ///
288 /// ```rust
289 /// let config = Config::new(my_address);
290 /// let mut consensus = Consensus::new(config);
291 /// ```
292 pub fn new(config: Config<A>) -> Self {
293 Self {
294 internal: HashMap::new(),
295 event_queue: VecDeque::new(),
296 config,
297 min_kept_height: None,
298 }
299 }
300
301 /// Recover recent heights from the write-ahead log.
302 ///
303 /// This method is used to recover consensus state after a crash or restart.
304 /// It reads the write-ahead log and reconstructs the consensus state for
305 /// all incomplete heights.
306 ///
307 /// ## Arguments
308 ///
309 /// - `config`: The consensus configuration
310 /// - `validator_sets`: A provider for validator sets at different heights
311 ///
312 /// ## Example
313 ///
314 /// ```rust
315 /// let validator_sets = Arc::new(StaticValidatorSetProvider::new(validator_set));
316 /// let mut consensus = Consensus::recover(config, validator_sets);
317 /// ```
318 pub fn recover<P: ValidatorSetProvider<A> + 'static>(
319 config: Config<A>,
320 validator_sets: Arc<P>,
321 ) -> Self {
322 use crate::wal::recovery;
323
324 tracing::info!(
325 validator = ?config.address,
326 wal_dir = %config.wal_dir.display(),
327 "Starting consensus recovery from WAL"
328 );
329
330 // Read the write-ahead log and recover all incomplete heights.
331 let incomplete_heights = match recovery::recover_incomplete_heights(&config.wal_dir) {
332 Ok(heights) => {
333 tracing::info!(
334 validator = ?config.address,
335 incomplete_heights = heights.len(),
336 "Found incomplete heights to recover"
337 );
338 heights
339 }
340 Err(e) => {
341 tracing::error!(
342 validator = ?config.address,
343 wal_dir = %config.wal_dir.display(),
344 error = %e,
345 "Failed to recover incomplete heights from WAL"
346 );
347 Vec::new()
348 }
349 };
350
351 // Create a new consensus engine.
352 let mut consensus = Self::new(config);
353
354 // Manually recover all incomplete heights.
355 for (height, entries) in incomplete_heights {
356 tracing::info!(
357 validator = ?consensus.config.address,
358 height = %height,
359 entry_count = entries.len(),
360 "Recovering height from WAL"
361 );
362
363 let validator_set = validator_sets.get_validator_set(height);
364 let mut internal_consensus = consensus.create_consensus(height, &validator_set);
365 internal_consensus.handle_command(ConsensusCommand::StartHeight(height, validator_set));
366 internal_consensus.recover_from_wal(entries);
367 consensus.internal.insert(height, internal_consensus);
368 }
369
370 tracing::info!(
371 validator = ?consensus.config.address,
372 recovered_heights = consensus.internal.len(),
373 "Completed consensus recovery"
374 );
375
376 consensus
377 }
378
379 fn create_consensus(
380 &mut self,
381 height: u64,
382 validator_set: &ValidatorSet<A>,
383 ) -> InternalConsensus<V, A> {
384 let params = InternalParams {
385 height,
386 validator_set: validator_set.clone(),
387 address: self.config.address.clone(),
388 threshold_params: self.config.threshold_params,
389 value_payload: malachite_types::ValuePayload::ProposalOnly,
390 };
391
392 // Create a WAL for the height. If we fail, use a NoopWal.
393 let wal = match FileWalSink::new(&self.config.address, height, &self.config.wal_dir) {
394 Ok(wal) => Box::new(wal) as Box<dyn WalSink<V, A>>,
395 Err(e) => {
396 tracing::error!(
397 validator = ?self.config.address,
398 height = %height,
399 error = %e,
400 "Failed to create wal for height"
401 );
402 Box::new(NoopWal)
403 }
404 };
405
406 // A new consensus is created for every new height.
407 InternalConsensus::new(params, self.config.timeout_values.clone(), wal)
408 }
409
410 /// Feed a command into the consensus engine.
411 ///
412 /// This method is the primary way to interact with the consensus engine.
413 /// Commands include starting new heights, submitting proposals, and
414 /// processing votes.
415 ///
416 /// ## Arguments
417 ///
418 /// - `cmd`: The command to process
419 ///
420 /// ## Example
421 ///
422 /// ```rust
423 /// // Start a new height
424 /// consensus.handle_command(ConsensusCommand::StartHeight(height, validator_set));
425 ///
426 /// // Submit a proposal
427 /// consensus.handle_command(ConsensusCommand::Proposal(signed_proposal));
428 ///
429 /// // Process a vote
430 /// consensus.handle_command(ConsensusCommand::Vote(signed_vote));
431 /// ```
432 pub fn handle_command(&mut self, cmd: ConsensusCommand<V, A>) {
433 match cmd {
434 // Start a new height.
435 ConsensusCommand::StartHeight(height, validator_set) => {
436 // A new consensus is created for every new height.
437 let mut consensus = self.create_consensus(height, &validator_set);
438 consensus.handle_command(ConsensusCommand::StartHeight(height, validator_set));
439 self.internal.insert(height, consensus);
440 tracing::debug!(
441 validator = ?self.config.address,
442 height = %height,
443 "Started new consensus"
444 );
445 }
446 other => {
447 let height = other.height();
448 if let Some(engine) = self.internal.get_mut(&height) {
449 engine.handle_command(other);
450 } else {
451 tracing::warn!(
452 validator = ?self.config.address,
453 height = %height,
454 command = ?other,
455 "Received command for unknown height"
456 );
457 }
458 }
459 }
460 }
461
462 /// Poll all engines for an event.
463 ///
464 /// This method should be called regularly to process events from the
465 /// consensus engine. Events include requests for proposals, decisions,
466 /// gossip messages, and errors.
467 ///
468 /// ## Returns
469 ///
470 /// Returns `Some(event)` if an event is available, or `None` if no events
471 /// are ready.
472 ///
473 /// ## Example
474 ///
475 /// ```rust
476 /// while let Some(event) = consensus.next_event().await {
477 /// match event {
478 /// ConsensusEvent::RequestProposal { height, round } => {
479 /// // Build and submit a proposal
480 /// }
481 /// ConsensusEvent::Decision { height, value } => {
482 /// // Consensus reached, process the value
483 /// }
484 /// ConsensusEvent::Gossip(message) => {
485 /// // Send message to peers
486 /// }
487 /// ConsensusEvent::Error(error) => {
488 /// // Handle error
489 /// }
490 /// }
491 /// }
492 /// ```
493 pub async fn next_event(&mut self) -> Option<ConsensusEvent<V, A>> {
494 let mut finished_heights = Vec::new();
495 // Drain each internal engine.
496 for (height, engine) in self.internal.iter_mut() {
497 if let Some(event) = engine.poll_internal().await {
498 tracing::trace!(
499 validator = ?self.config.address,
500 height = %height,
501 event = ?event,
502 "Engine returned event"
503 );
504 // Track finished heights.
505 if let ConsensusEvent::Decision { height, .. } = &event {
506 finished_heights.push(*height);
507 }
508 // Push the event to the queue.
509 self.event_queue.push_back(event);
510 }
511 }
512
513 // Prune old engines if we have any finished heights.
514 if !finished_heights.is_empty() {
515 self.prune_old_engines();
516 }
517
518 // Return the first event from the queue.
519 self.event_queue.pop_front()
520 }
521
522 /// Prune old engines from the internal map.
523 fn prune_old_engines(&mut self) {
524 let max_height = self.internal.keys().max().copied();
525 if let Some(max_height) = max_height {
526 let new_min_height = max_height.checked_sub(self.config.history_depth);
527
528 if let Some(new_min) = new_min_height {
529 self.min_kept_height = Some(new_min);
530 self.internal.retain(|height, _| *height >= new_min);
531
532 tracing::debug!(
533 validator = ?self.config.address,
534 min_height = %new_min,
535 max_height = %max_height,
536 "Pruned old consensus engines"
537 );
538 }
539 }
540 }
541
542 /// Check if a specific height has been finalized (i.e., a decision has been
543 /// reached)
544 ///
545 /// ## Arguments
546 ///
547 /// - `height`: The height to check
548 ///
549 /// ## Returns
550 ///
551 /// Returns `true` if the height has been finalized, `false` otherwise.
552 ///
553 /// ## Example
554 ///
555 /// ```rust
556 /// if consensus.is_height_finalized(height) {
557 /// println!("Height {} has been finalized", height);
558 /// }
559 /// ```
560 pub fn is_height_finalized(&self, height: u64) -> bool {
561 if let Some(engine) = self.internal.get(&height) {
562 engine.is_finalized()
563 } else {
564 // If the height is not in our internal map, it might have been pruned
565 // after being finalized, so we assume it's finalized
566 if let Some(min_height) = self.min_kept_height {
567 if height < min_height {
568 return true;
569 }
570 }
571 false
572 }
573 }
574}
575
576/// A round number (or `None` if the round is nil).
577///
578/// This type represents a consensus round number. A round can be either a
579/// specific round number or nil (None), which represents a special state in the
580/// consensus protocol.
581///
582/// ## Example
583///
584/// ```rust
585/// let round = Round::new(5); // Round 5
586/// let nil_round = Round::nil(); // Nil round
587/// ```
588#[derive(Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
589pub struct Round(pub Option<u32>);
590
591impl Round {
592 /// Create a new round with the given round number.
593 pub fn new(round: u32) -> Self {
594 Self(Some(round))
595 }
596
597 /// Create a nil round.
598 pub fn nil() -> Self {
599 Self(None)
600 }
601
602 /// Get the round number as a u32, if it's not nil.
603 pub fn as_u32(&self) -> Option<u32> {
604 self.0
605 }
606}
607
608impl From<u32> for Round {
609 fn from(round: u32) -> Self {
610 Self::new(round)
611 }
612}
613
614impl Add<u32> for Round {
615 type Output = Self;
616
617 fn add(self, rhs: u32) -> Self::Output {
618 Self(self.0.map(|round| round + rhs))
619 }
620}
621
622impl Sub<u32> for Round {
623 type Output = Self;
624
625 fn sub(self, rhs: u32) -> Self::Output {
626 Self(self.0.map(|round| round - rhs))
627 }
628}
629
630impl Display for Round {
631 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
632 match self.0 {
633 Some(round) => write!(f, "{round}"),
634 None => write!(f, "Nil"),
635 }
636 }
637}
638
639impl Debug for Round {
640 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
641 write!(f, "{self}")
642 }
643}
644
645/// A proposal for a block value in a consensus round.
646///
647/// A proposal is created by the designated proposer for a given height and
648/// round. It contains the proposed block value along with additional metadata.
649#[derive(Clone, PartialEq, Eq, Serialize, Deserialize)]
650pub struct Proposal<V, A> {
651 /// The blockchain height
652 pub height: u64,
653 /// The consensus round number
654 pub round: Round,
655 /// The proposed consensus value
656 pub value: V,
657 /// The POL round for which the proposal is for
658 pub pol_round: Round,
659 /// The address of the proposer
660 pub proposer: A,
661}
662
663impl<V: Debug, A: Debug> std::fmt::Debug for Proposal<V, A> {
664 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
665 write!(
666 f,
667 "H:{} R:{} From:{:?} Val:{:?}",
668 self.height, self.round, self.proposer, self.value
669 )
670 }
671}
672
673/// The type of vote.
674#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
675pub enum VoteType {
676 /// A preliminary vote
677 Prevote,
678 /// A final vote that commits to a value
679 Precommit,
680}
681
682/// A vote for a value in a consensus round.
683///
684/// A vote is cast by a validator to indicate their agreement or disagreement
685/// with a proposed block value. The vote includes the validator's address, the
686/// round number, and the block value being voted on.
687#[derive(Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
688pub struct Vote<V, A> {
689 /// The type of vote (Prevote or Precommit)
690 pub r#type: VoteType,
691 /// The blockchain height
692 pub height: u64,
693 /// The consensus round number
694 pub round: Round,
695 /// The value being voted on (None for nil votes)
696 pub value: Option<V>,
697 /// The address of the validator casting the vote
698 pub validator_address: A,
699}
700
701impl<V: Debug, A: Debug> std::fmt::Debug for Vote<V, A> {
702 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
703 let val = match &self.value {
704 Some(val) => format!("{val:?}"),
705 None => "Nil".to_string(),
706 };
707 write!(
708 f,
709 "H:{} R:{} {:?} From:{:?} Val:{val}",
710 self.height, self.round, self.r#type, self.validator_address
711 )
712 }
713}
714
715impl<V, A> Vote<V, A> {
716 /// Check if the vote is nil.
717 ///
718 /// A nil vote is a vote that does not commit to a value.
719 pub fn is_nil(&self) -> bool {
720 self.value.is_none()
721 }
722}
723
724/// A fully validated, signed proposal ready to enter consensus.
725///
726/// This type wraps a proposal with a cryptographic signature to ensure
727/// authenticity and integrity.
728#[derive(Clone, Serialize, Deserialize)]
729pub struct SignedProposal<V, A> {
730 pub proposal: Proposal<V, A>,
731 pub signature: Signature,
732}
733
734/// A signed vote.
735///
736/// This type wraps a vote with a cryptographic signature to ensure
737/// authenticity and integrity.
738#[derive(Clone, Serialize, Deserialize)]
739pub struct SignedVote<V, A> {
740 pub vote: Vote<V, A>,
741 pub signature: Signature,
742}
743
744// Note: We intentionally ignore the signature as it's not used yet.
745impl<V: Debug, A: Debug> std::fmt::Debug for SignedProposal<V, A> {
746 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
747 write!(f, "{:?}", self.proposal)
748 }
749}
750
751// Note: We intentionally ignore the signature as it's not used yet.
752impl<V: Debug, A: Debug> std::fmt::Debug for SignedVote<V, A> {
753 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
754 write!(f, "{:?}", self.vote)
755 }
756}
757
758/// A public key for the consensus protocol.
759///
760/// This type is used to verify signatures on proposals and votes in the
761/// consensus protocol. Each validator has an associated public key that is used
762/// to authenticate their messages.
763pub type PublicKey = malachite_signing_ed25519::PublicKey;
764
765/// A validator's voting power.
766pub type VotingPower = u64;
767
768/// A validator in the consensus protocol.
769///
770/// Each validator has an associated address and public key to uniquely identify
771/// them. The voting power determines their weight in consensus decisions.
772#[derive(Clone, PartialEq, Eq, PartialOrd, Ord)]
773pub struct Validator<A> {
774 /// The validator's address
775 pub address: A,
776 /// The validator's public key for signature verification
777 pub public_key: PublicKey,
778 /// The validator's voting power (weight in consensus)
779 pub voting_power: VotingPower,
780}
781
782impl<A: Debug> std::fmt::Debug for Validator<A> {
783 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
784 write!(f, "{:?} ({})", self.address, self.voting_power)
785 }
786}
787
788impl<A> Validator<A> {
789 /// Create a new validator with the given address and public key.
790 ///
791 /// The voting power defaults to 1.
792 pub fn new(address: A, public_key: PublicKey) -> Self {
793 Self {
794 address,
795 public_key,
796 voting_power: 1,
797 }
798 }
799
800 /// Set the voting power for the validator.
801 ///
802 /// This method returns `self` for method chaining.
803 pub fn with_voting_power(mut self, voting_power: VotingPower) -> Self {
804 self.voting_power = voting_power;
805 self
806 }
807}
808
809/// A validator set represents a group of consensus participants.
810///
811/// The validator set defines who can participate in consensus at a given
812/// height. Each validator in the set has a voting power that determines their
813/// weight in consensus decisions.
814#[derive(Clone, Debug, PartialEq, Eq)]
815pub struct ValidatorSet<A> {
816 /// The list of validators in the set
817 pub validators: Vec<Validator<A>>,
818}
819
820impl<A: Clone + Ord> ValidatorSet<A> {
821 /// Create a new validator set with the given validators.
822 pub fn new(validators: impl IntoIterator<Item = Validator<A>>) -> Self {
823 // Ensure validators are unique by address.
824 let validators: BTreeMap<A, Validator<A>> = validators
825 .into_iter()
826 .map(|v| (v.address.clone(), v))
827 .collect();
828 assert!(!validators.is_empty());
829 let validators = validators.into_values().collect();
830 Self { validators }
831 }
832
833 /// Get the number of validators in the set.
834 pub fn count(&self) -> usize {
835 self.validators.len()
836 }
837}
838
839/// Commands that the application can send into the consensus engine.
840///
841/// These commands represent the primary interface for interacting with the
842/// consensus engine. They allow the application to start new heights,
843/// submit proposals, and process votes.
844pub enum ConsensusCommand<V, A> {
845 /// Start consensus at a given height with the validator set.
846 StartHeight(u64, ValidatorSet<A>),
847 /// A complete, locally validated and signed proposal that we create as the
848 /// proposer for the current round.
849 Propose(Proposal<V, A>),
850 /// A complete, locally validated and signed proposal that was received over
851 /// the network from another validator.
852 Proposal(SignedProposal<V, A>),
853 /// A signed vote received from the network.
854 Vote(SignedVote<V, A>),
855}
856
857impl<V, A> ConsensusCommand<V, A> {
858 /// Get the consensus height associated with the command.
859 pub fn height(&self) -> u64 {
860 match self {
861 ConsensusCommand::StartHeight(height, _) => *height,
862 ConsensusCommand::Propose(proposal) => proposal.height,
863 ConsensusCommand::Proposal(proposal) => proposal.proposal.height,
864 ConsensusCommand::Vote(vote) => vote.vote.height,
865 }
866 }
867}
868
869impl<V: Debug, A: Debug> std::fmt::Debug for ConsensusCommand<V, A> {
870 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
871 match self {
872 ConsensusCommand::StartHeight(height, validator_set) => write!(
873 f,
874 "StartHeight({}, [{}])",
875 height,
876 validator_set
877 .validators
878 .iter()
879 .map(|v| format!("{:?}", v.address))
880 .collect::<Vec<_>>()
881 .join(", ")
882 ),
883 ConsensusCommand::Propose(proposal) => write!(f, "Propose({proposal:?})"),
884 ConsensusCommand::Proposal(proposal) => write!(f, "Proposal({proposal:?})"),
885 ConsensusCommand::Vote(vote) => write!(f, "Vote({vote:?})"),
886 }
887 }
888}
889
890/// A message to be gossiped to peers.
891///
892/// These messages represent network communication that needs to be sent to
893/// other validators in the network.
894#[derive(Clone, Debug)]
895pub enum NetworkMessage<V, A> {
896 /// A complete, locally validated and signed proposal ready to be gossiped.
897 Proposal(SignedProposal<V, A>),
898 /// A vote received from the network.
899 Vote(SignedVote<V, A>),
900}
901
902/// Events that the consensus engine emits for the application to handle.
903///
904/// These events represent the output of the consensus engine and tell the
905/// application what actions it needs to take.
906pub enum ConsensusEvent<V, A> {
907 /// The consensus wants this message to be gossiped to peers.
908 ///
909 /// The application should send this message to all peers in the network.
910 Gossip(NetworkMessage<V, A>),
911 /// The consensus needs the app to build and inject a proposal.
912 ///
913 /// The application should create a proposal for the given height and round,
914 /// then submit it to the consensus engine.
915 RequestProposal { height: u64, round: u32 },
916 /// The consensus has reached a decision and committed a block.
917 ///
918 /// This event indicates that consensus has been reached for the given
919 /// height and the value should be committed to the blockchain.
920 Decision { height: u64, value: V },
921 /// An internal error occurred in consensus.
922 ///
923 /// The application should handle this error appropriately, possibly by
924 /// logging it or taking corrective action.
925 Error(anyhow::Error),
926}
927
928impl<V: Debug, A: Debug> std::fmt::Debug for ConsensusEvent<V, A> {
929 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
930 match self {
931 ConsensusEvent::Gossip(msg) => match msg {
932 NetworkMessage::Proposal(proposal) => write!(f, "Gossip(Proposal({proposal:?}))"),
933 NetworkMessage::Vote(vote) => write!(f, "Gossip(Vote({vote:?}))"),
934 },
935 ConsensusEvent::RequestProposal { height, round, .. } => {
936 write!(f, "RequestProposal(H:{height} R:{round})")
937 }
938 ConsensusEvent::Decision { height, value } => {
939 write!(f, "Decision(H:{height} Val:{value:?})")
940 }
941 ConsensusEvent::Error(error) => write!(f, "Error({error:?})"),
942 }
943 }
944}
945
946/// A trait for retrieving the validator set at a specific blockchain height.
947///
948/// This trait allows consensus to dynamically determine the set of validators
949/// that are eligible to participate in consensus at any given height.
950///
951/// This is useful for handling validator set changes across heights.
952pub trait ValidatorSetProvider<A>: Send + Sync {
953 /// Get the validator set for the given height.
954 ///
955 /// ## Arguments
956 ///
957 /// - `height`: The blockchain height
958 ///
959 /// ## Returns
960 ///
961 /// Returns the validator set for the given height.
962 fn get_validator_set(&self, height: u64) -> ValidatorSet<A>;
963}
964
965/// A validator set provider that always returns the same validator set.
966///
967/// This is a simple implementation of `ValidatorSetProvider` that returns
968/// the same validator set for all heights. This is useful for testing or
969/// for applications where the validator set doesn't change.
970pub struct StaticValidatorSetProvider<A> {
971 validator_set: ValidatorSet<A>,
972}
973
974impl<A> StaticValidatorSetProvider<A> {
975 /// Create a new static validator set provider.
976 ///
977 /// ## Arguments
978 ///
979 /// - `validator_set`: The validator set to return for all heights
980 pub fn new(validator_set: ValidatorSet<A>) -> Self {
981 Self { validator_set }
982 }
983}
984
985impl<A: Clone + Send + Sync> ValidatorSetProvider<A> for StaticValidatorSetProvider<A> {
986 fn get_validator_set(&self, _height: u64) -> ValidatorSet<A> {
987 self.validator_set.clone()
988 }
989}
990
991#[cfg(test)]
992mod tests {
993 use super::*;
994
995 #[test]
996 fn regression_validator_set_is_unique_by_address() {
997 let with_duplicates = [1, 1, 2, 2, 2, 3, 3, 3, 3, 2, 1, 1, 2, 3, 2, 2, 1, 1, 3, 3]
998 .into_iter()
999 .map(|i| Validator::new(i, crate::PublicKey::from_bytes([0; 32])));
1000 let set = ValidatorSet::new(with_duplicates);
1001
1002 assert_eq!(
1003 set.validators.iter().map(|v| v.address).collect::<Vec<_>>(),
1004 vec![1, 2, 3]
1005 );
1006 }
1007}