pathfinder_consensus/
lib.rs

1//! # Pathfinder Consensus
2//!
3//! A Byzantine Fault Tolerant (BFT) consensus engine for Starknet nodes.
4//!
5//! ## Overview
6//!
7//! This crate provides a consensus engine for Starknet nodes that wraps the
8//! Malachite implementation of the Tendermint BFT consensus algorithm. It's
9//! designed to be generic over validator addresses and consensus values, making
10//! it suitable for Starknet's consensus requirements.
11//!
12//! ## Core Concepts
13//!
14//! ### ValidatorAddress Trait
15//!
16//! Your validator address type must implement the `ValidatorAddress` trait,
17//! which requires:
18//! - `Sync + Send`: Thread-safe and sendable across threads
19//! - `Ord + Display + Debug + Default + Clone`: Standard Rust traits for
20//!   ordering, display, debugging, default values, and cloning
21//! - `Into<Vec<u8>>`: Convertible to bytes for serialization
22//! - `Serialize + DeserializeOwned`: Serde serialization support
23//!
24//! ### ValuePayload Trait
25//!
26//! Your consensus value type must implement the `ValuePayload` trait, which
27//! requires:
28//! - `Sync + Send`: Thread-safe and sendable across threads
29//! - `Ord + Display + Debug + Default + Clone`: Standard Rust traits
30//! - `Serialize + DeserializeOwned`: Serde serialization support
31//!
32//! ### Consensus Engine
33//!
34//! The main `Consensus<V, A>` struct is generic over:
35//! - `V`: Your consensus value type (must implement `ValuePayload`)
36//! - `A`: Your validator address type (must implement `ValidatorAddress`)
37//!
38//! ## Usage Example
39//!
40//! ```rust
41//! use pathfinder_consensus::*;
42//! use serde::{Deserialize, Serialize};
43//!
44//! // Define your validator address type
45//! #[derive(Clone, Debug, Default, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
46//! struct MyAddress(String);
47//!
48//! impl std::fmt::Display for MyAddress {
49//!     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
50//!         write!(f, "{}", self.0)
51//!     }
52//! }
53//!
54//! impl From<MyAddress> for Vec<u8> {
55//!     fn from(addr: MyAddress) -> Self {
56//!         addr.0.into_bytes()
57//!     }
58//! }
59//!
60//! // Define your consensus value type
61//! #[derive(Clone, Debug, Default, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
62//! struct BlockData(String);
63//!
64//! impl std::fmt::Display for BlockData {
65//!     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
66//!         write!(f, "{}", self.0)
67//!     }
68//! }
69//!
70//! // Custom proposer selector that uses weighted selection
71//! #[derive(Clone)]
72//! struct WeightedProposerSelector;
73//!
74//! impl ProposerSelector<MyAddress> for WeightedProposerSelector {
75//!     fn select_proposer<'a>(
76//!         &self,
77//!         validator_set: &'a ValidatorSet<MyAddress>,
78//!         _height: u64,
79//!         round: u32,
80//!     ) -> &'a Validator<MyAddress> {
81//!         // Simple weighted selection based on voting power
82//!         let total_power: u64 = validator_set
83//!             .validators
84//!             .iter()
85//!             .map(|v| v.voting_power)
86//!             .sum();
87//!         let selection = (round as u64) % total_power;
88//!
89//!         let mut cumulative = 0;
90//!         for validator in &validator_set.validators {
91//!             cumulative += validator.voting_power;
92//!             if selection < cumulative {
93//!                 return validator;
94//!             }
95//!         }
96//!
97//!         // Fallback to first validator
98//!         &validator_set.validators[0]
99//!     }
100//! }
101//!
102//! #[tokio::main]
103//! async fn main() {
104//!     // Create configuration
105//!     let my_address = MyAddress("validator_1".to_string());
106//!     let config = Config::new(my_address.clone());
107//!
108//!     // Create consensus engine with custom proposer selector
109//!     let proposer_selector = WeightedProposerSelector;
110//!     let mut consensus = Consensus::new(config).with_proposer_selector(proposer_selector);
111//!
112//!     // Or use the default round-robin selector (no additional configuration needed)
113//!     // let mut consensus = Consensus::new(config);
114//!
115//!     // Start consensus at height 1
116//!     let validator_set = ValidatorSet::new(vec![Validator::new(
117//!         my_address.clone(),
118//!         PublicKey::from_bytes([0; 32]),
119//!     )
120//!     .with_voting_power(10)]);
121//!
122//!     consensus.handle_command(ConsensusCommand::StartHeight(1, validator_set));
123//!
124//!     // Poll for events
125//!     while let Some(event) = consensus.next_event().await {
126//!         match event {
127//!             ConsensusEvent::RequestProposal { height, round } => {
128//!                 println!("Need to propose at height {}, round {}", height, round);
129//!             }
130//!             ConsensusEvent::Decision { height, value } => {
131//!                 println!("Consensus reached at height {}: {:?}", height, value);
132//!             }
133//!             ConsensusEvent::Gossip(message) => {
134//!                 println!("Need to gossip: {:?}", message);
135//!             }
136//!             ConsensusEvent::Error(error) => {
137//!                 eprintln!("Consensus error: {}", error);
138//!             }
139//!         }
140//!     }
141//! }
142//! ```
143//!
144//! ## Commands and Events
145//!
146//! The consensus engine operates on a command/event model:
147//!
148//! - **Commands**: Send commands to the consensus engine via `handle_command()`
149//! - **Events**: Poll for events from the consensus engine via
150//!   `next_event().await`
151//!
152//! ## Crash Recovery
153//!
154//! The consensus engine supports crash recovery through write-ahead logging:
155//!
156//! ```rust
157//! // Recover from a previous crash
158//! let validator_sets = Arc::new(StaticValidatorSetProvider::new(validator_set));
159//! let mut consensus = Consensus::recover(config, validator_sets);
160//! ```
161
162use std::collections::{BTreeMap, HashMap, VecDeque};
163use std::fmt::{Debug, Display};
164use std::ops::{Add, Sub};
165use std::sync::Arc;
166
167use serde::de::DeserializeOwned;
168use serde::{Deserialize, Serialize};
169
170// Re-export consensus types needed by the public API
171pub use crate::config::{Config, TimeoutValues};
172use crate::internal::{InternalConsensus, InternalParams};
173use crate::wal::{FileWalSink, NoopWal, WalSink};
174
175mod config;
176mod internal;
177mod wal;
178
179/// A cryptographic signature for consensus messages.
180///
181/// This type is used to sign proposals and votes in the consensus protocol
182/// to ensure authenticity and integrity of consensus messages.
183pub type Signature = malachite_signing_ed25519::Signature;
184
185/// An Ed25519 signing key.
186///
187/// This is also called a secret key by other implementations.
188pub type SigningKey = ed25519_consensus::SigningKey;
189
190/// A trait for consensus validator addresses.
191///
192/// This trait defines the requirements for validator address types used in the
193/// consensus engine. Your validator address type must implement all the
194/// required traits to be compatible with the consensus engine.
195///
196/// ## Required Traits
197///
198/// - `Sync + Send`: Thread-safe and sendable across threads
199/// - `Ord + Display + Debug + Default + Clone`: Standard Rust traits for
200///   ordering, display, debugging, default values, and cloning
201/// - `Into<Vec<u8>>`: Convertible to bytes for serialization
202/// - `Serialize + DeserializeOwned`: Serde serialization support
203///
204/// ## Example Implementation
205///
206/// ```rust
207/// #[derive(Clone, Debug, Default, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
208/// struct MyAddress(String);
209///
210/// impl std::fmt::Display for MyAddress {
211///     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
212///         write!(f, "{}", self.0)
213///     }
214/// }
215///
216/// impl From<MyAddress> for Vec<u8> {
217///     fn from(addr: MyAddress) -> Self {
218///         addr.0.into_bytes()
219///     }
220/// }
221/// ```
222pub trait ValidatorAddress:
223    Sync + Send + Ord + Display + Debug + Default + Clone + Into<Vec<u8>> + Serialize + DeserializeOwned
224{
225}
226impl<T> ValidatorAddress for T where
227    T: Sync
228        + Send
229        + Ord
230        + Display
231        + Debug
232        + Default
233        + Clone
234        + Into<Vec<u8>>
235        + Serialize
236        + DeserializeOwned
237{
238}
239
240/// A trait for consensus value payloads.
241///
242/// This trait defines the requirements for consensus value types used in the
243/// consensus engine. Your consensus value type must implement all the required
244/// traits to be compatible with the consensus engine.
245///
246/// ## Required Traits
247///
248/// - `Sync + Send`: Thread-safe and sendable across threads
249/// - `Ord + Display + Debug + Default + Clone`: Standard Rust traits
250/// - `Serialize + DeserializeOwned`: Serde serialization support
251///
252/// ## Example Implementation
253///
254/// ```rust
255/// #[derive(Clone, Debug, Default, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
256/// struct BlockData(String);
257///
258/// impl std::fmt::Display for BlockData {
259///     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
260///         write!(f, "{}", self.0)
261///     }
262/// }
263/// ```
264pub trait ValuePayload:
265    Sync + Send + Ord + Display + Debug + Default + Clone + Serialize + DeserializeOwned
266{
267}
268impl<T> ValuePayload for T where
269    T: Sync + Send + Ord + Display + Debug + Default + Clone + Serialize + DeserializeOwned
270{
271}
272
273/// # Pathfinder consensus engine
274///
275/// This is the main consensus engine for Starknet nodes that implements
276/// Byzantine Fault Tolerant (BFT) consensus using the Malachite implementation
277/// of Tendermint. It's generic over validator addresses, consensus values, and
278/// proposer selection algorithms, making it suitable for Starknet's consensus
279/// requirements.
280///
281/// ## Generic Parameters
282///
283/// - `V`: Your consensus value type (must implement `ValuePayload`)
284/// - `A`: Your validator address type (must implement `ValidatorAddress`)
285/// - `P`: Your proposer selector type (must implement `ProposerSelector<A>`)
286///
287/// ## Usage
288///
289/// ```rust
290/// let config = Config::new(my_address);
291/// let mut consensus = Consensus::new(config);
292///
293/// // Start consensus at a height
294/// consensus.handle_command(ConsensusCommand::StartHeight(height, validator_set));
295///
296/// // Poll for events
297/// while let Some(event) = consensus.next_event().await {
298///     // Handle events
299/// }
300/// ```
301///
302/// ## Custom Proposer Selection
303///
304/// To use a custom proposer selector:
305///
306/// ```rust
307/// let config = Config::new(my_address);
308/// let custom_selector = WeightedProposerSelector;
309/// let mut consensus = Consensus::new(config).with_proposer_selector(custom_selector);
310/// ```
311///
312/// ## Crash Recovery
313///
314/// The consensus engine supports crash recovery through write-ahead logging:
315///
316/// ```rust
317/// let validator_sets = Arc::new(StaticValidatorSetProvider::new(validator_set));
318/// let mut consensus = Consensus::recover(config, validator_sets)?;
319/// ```
320pub struct Consensus<
321    V: ValuePayload + 'static,
322    A: ValidatorAddress + 'static,
323    P: ProposerSelector<A> + Send + Sync + 'static,
324> {
325    internal: HashMap<u64, InternalConsensus<V, A, P>>,
326    event_queue: VecDeque<ConsensusEvent<V, A>>,
327    config: Config<A>,
328    proposer_selector: P,
329    min_kept_height: Option<u64>,
330}
331
332impl<
333        V: ValuePayload + 'static,
334        A: ValidatorAddress + 'static,
335        P: ProposerSelector<A> + Send + Sync + 'static,
336    > Consensus<V, A, P>
337{
338    /// Create a new consensus engine for the current validator.
339    ///
340    /// ## Arguments
341    ///
342    /// - `config`: The consensus configuration containing validator address,
343    ///   timeouts, and other settings
344    ///
345    /// ## Example
346    ///
347    /// ```rust
348    /// let config = Config::new(my_address);
349    /// let mut consensus = Consensus::new(config);
350    /// ```
351    pub fn new(config: Config<A>) -> DefaultConsensus<V, A> {
352        Consensus {
353            internal: HashMap::new(),
354            event_queue: VecDeque::new(),
355            config,
356            proposer_selector: RoundRobinProposerSelector,
357            min_kept_height: None,
358        }
359    }
360
361    /// Set the proposer selector for this consensus engine.
362    ///
363    /// This method consumes `self` and returns a new `Consensus` instance
364    /// with the specified proposer selector.
365    ///
366    /// ## Arguments
367    ///
368    /// - `proposer_selector`: The proposer selection algorithm to use
369    ///
370    /// ## Example
371    ///
372    /// ```rust
373    /// let config = Config::new(my_address);
374    /// let custom_selector = WeightedProposerSelector;
375    /// let mut consensus = Consensus::new(config).with_proposer_selector(custom_selector);
376    /// ```
377    pub fn with_proposer_selector<PS: ProposerSelector<A> + Send + Sync + 'static>(
378        self,
379        proposer_selector: PS,
380    ) -> Consensus<V, A, PS> {
381        Consensus {
382            internal: HashMap::new(),
383            event_queue: VecDeque::new(),
384            config: self.config,
385            proposer_selector,
386            min_kept_height: self.min_kept_height,
387        }
388    }
389
390    /// Recover recent heights from the write-ahead log.
391    ///
392    /// This method is used to recover consensus state after a crash or restart.
393    /// It reads the write-ahead log and reconstructs the consensus state for
394    /// all incomplete heights.
395    ///
396    /// ## Arguments
397    ///
398    /// - `config`: The consensus configuration
399    /// - `validator_sets`: A provider for validator sets at different heights
400    ///
401    /// ## Example
402    ///
403    /// ```rust
404    /// let validator_sets = Arc::new(StaticValidatorSetProvider::new(validator_set));
405    /// let mut consensus = Consensus::recover(config, validator_sets)?;
406    /// ```
407    pub fn recover<VS: ValidatorSetProvider<A> + 'static>(
408        config: Config<A>,
409        validator_sets: Arc<VS>,
410    ) -> anyhow::Result<DefaultConsensus<V, A>> {
411        use crate::wal::recovery;
412
413        tracing::info!(
414            validator = ?config.address,
415            wal_dir = %config.wal_dir.display(),
416            "Starting consensus recovery from WAL"
417        );
418
419        // Read the write-ahead log and recover all incomplete heights.
420        let incomplete_heights = match recovery::recover_incomplete_heights(&config.wal_dir) {
421            Ok(heights) => {
422                tracing::info!(
423                    validator = ?config.address,
424                    incomplete_heights = heights.len(),
425                    "Found incomplete heights to recover"
426                );
427                heights
428            }
429            Err(e) => {
430                tracing::error!(
431                    validator = ?config.address,
432                    wal_dir = %config.wal_dir.display(),
433                    error = %e,
434                    "Failed to recover incomplete heights from WAL"
435                );
436                Vec::new()
437            }
438        };
439
440        // Create a new consensus engine.
441        let mut consensus = Self::new(config);
442
443        // Manually recover all incomplete heights.
444        for (height, entries) in incomplete_heights {
445            tracing::info!(
446                validator = ?consensus.config.address,
447                height = %height,
448                entry_count = entries.len(),
449                "Recovering height from WAL"
450            );
451
452            let validator_set = validator_sets.get_validator_set(height)?;
453            let mut internal_consensus = consensus.create_consensus(height, &validator_set);
454            internal_consensus.handle_command(ConsensusCommand::StartHeight(height, validator_set));
455            internal_consensus.recover_from_wal(entries);
456            consensus.internal.insert(height, internal_consensus);
457        }
458
459        tracing::info!(
460            validator = ?consensus.config.address,
461            recovered_heights = consensus.internal.len(),
462            "Completed consensus recovery"
463        );
464
465        Ok(consensus)
466    }
467
468    fn create_consensus(
469        &mut self,
470        height: u64,
471        validator_set: &ValidatorSet<A>,
472    ) -> InternalConsensus<V, A, P> {
473        let params = InternalParams {
474            height,
475            validator_set: validator_set.clone(),
476            address: self.config.address.clone(),
477            threshold_params: self.config.threshold_params,
478            value_payload: malachite_types::ValuePayload::ProposalOnly,
479        };
480
481        // Create a WAL for the height. If we fail, use a NoopWal.
482        let wal = match FileWalSink::new(&self.config.address, height, &self.config.wal_dir) {
483            Ok(wal) => Box::new(wal) as Box<dyn WalSink<V, A>>,
484            Err(e) => {
485                tracing::error!(
486                    validator = ?self.config.address,
487                    height = %height,
488                    error = %e,
489                    "Failed to create wal for height"
490                );
491                Box::new(NoopWal)
492            }
493        };
494
495        // A new consensus is created for every new height.
496        InternalConsensus::new(
497            params,
498            self.config.timeout_values.clone(),
499            wal,
500            self.proposer_selector.clone(),
501        )
502    }
503
504    /// Feed a command into the consensus engine.
505    ///
506    /// This method is the primary way to interact with the consensus engine.
507    /// Commands include starting new heights, submitting proposals, and
508    /// processing votes.
509    ///
510    /// ## Arguments
511    ///
512    /// - `cmd`: The command to process
513    ///
514    /// ## Example
515    ///
516    /// ```rust
517    /// // Start a new height
518    /// consensus.handle_command(ConsensusCommand::StartHeight(height, validator_set));
519    ///
520    /// // Submit a proposal
521    /// consensus.handle_command(ConsensusCommand::Proposal(signed_proposal));
522    ///
523    /// // Process a vote
524    /// consensus.handle_command(ConsensusCommand::Vote(signed_vote));
525    /// ```
526    pub fn handle_command(&mut self, cmd: ConsensusCommand<V, A>) {
527        match cmd {
528            // Start a new height.
529            ConsensusCommand::StartHeight(height, validator_set) => {
530                // A new consensus is created for every new height.
531                let mut consensus = self.create_consensus(height, &validator_set);
532                consensus.handle_command(ConsensusCommand::StartHeight(height, validator_set));
533                self.internal.insert(height, consensus);
534                tracing::debug!(
535                    validator = ?self.config.address,
536                    height = %height,
537                    "Started new consensus"
538                );
539            }
540            other => {
541                let height = other.height();
542                if let Some(engine) = self.internal.get_mut(&height) {
543                    engine.handle_command(other);
544                } else {
545                    tracing::warn!(
546                        validator = ?self.config.address,
547                        height = %height,
548                        command = ?other,
549                        "Received command for unknown height"
550                    );
551                }
552            }
553        }
554    }
555
556    /// Poll all engines for an event.
557    ///
558    /// This method should be called regularly to process events from the
559    /// consensus engine. Events include requests for proposals, decisions,
560    /// gossip messages, and errors.
561    ///
562    /// ## Returns
563    ///
564    /// Returns `Some(event)` if an event is available, or `None` if no events
565    /// are ready.
566    ///
567    /// ## Example
568    ///
569    /// ```rust
570    /// while let Some(event) = consensus.next_event().await {
571    ///     match event {
572    ///         ConsensusEvent::RequestProposal { height, round } => {
573    ///             // Build and submit a proposal
574    ///         }
575    ///         ConsensusEvent::Decision { height, value } => {
576    ///             // Consensus reached, process the value
577    ///         }
578    ///         ConsensusEvent::Gossip(message) => {
579    ///             // Send message to peers
580    ///         }
581    ///         ConsensusEvent::Error(error) => {
582    ///             // Handle error
583    ///         }
584    ///     }
585    /// }
586    /// ```
587    pub async fn next_event(&mut self) -> Option<ConsensusEvent<V, A>> {
588        let mut finished_heights = Vec::new();
589        // Drain each internal engine.
590        for (height, engine) in self.internal.iter_mut() {
591            if let Some(event) = engine.poll_internal().await {
592                tracing::trace!(
593                    validator = ?self.config.address,
594                    height = %height,
595                    event = ?event,
596                    "Engine returned event"
597                );
598                // Track finished heights.
599                if let ConsensusEvent::Decision { height, .. } = &event {
600                    finished_heights.push(*height);
601                }
602                // Push the event to the queue.
603                self.event_queue.push_back(event);
604            }
605        }
606
607        // Prune old engines if we have any finished heights.
608        if !finished_heights.is_empty() {
609            self.prune_old_engines();
610        }
611
612        // Return the first event from the queue.
613        self.event_queue.pop_front()
614    }
615
616    /// Prune old engines from the internal map.
617    fn prune_old_engines(&mut self) {
618        let max_height = self.internal.keys().max().copied();
619        if let Some(max_height) = max_height {
620            let new_min_height = max_height.checked_sub(self.config.history_depth);
621
622            if let Some(new_min) = new_min_height {
623                self.min_kept_height = Some(new_min);
624                self.internal.retain(|height, _| *height >= new_min);
625
626                tracing::debug!(
627                    validator = ?self.config.address,
628                    min_height = %new_min,
629                    max_height = %max_height,
630                    "Pruned old consensus engines"
631                );
632            }
633        }
634    }
635
636    /// Check if a specific height has been finalized (i.e., a decision has been
637    /// reached)
638    ///
639    /// ## Arguments
640    ///
641    /// - `height`: The height to check
642    ///
643    /// ## Returns
644    ///
645    /// Returns `true` if the height has been finalized, `false` otherwise.
646    ///
647    /// ## Example
648    ///
649    /// ```rust
650    /// if consensus.is_height_finalized(height) {
651    ///     println!("Height {} has been finalized", height);
652    /// }
653    /// ```
654    pub fn is_height_finalized(&self, height: u64) -> bool {
655        if let Some(engine) = self.internal.get(&height) {
656            engine.is_finalized()
657        } else {
658            // If the height is not in our internal map, it might have been pruned
659            // after being finalized, so we assume it's finalized
660            if let Some(min_height) = self.min_kept_height {
661                if height < min_height {
662                    return true;
663                }
664            }
665            false
666        }
667    }
668
669    /// Get the current maximum height being tracked by the consensus engine.
670    pub fn current_height(&self) -> Option<u64> {
671        self.internal.keys().max().copied()
672    }
673}
674
675/// A round number (or `None` if the round is nil).
676///
677/// This type represents a consensus round number. A round can be either a
678/// specific round number or nil (None), which represents a special state in the
679/// consensus protocol.
680///
681/// ## Example
682///
683/// ```rust
684/// let round = Round::new(5); // Round 5
685/// let nil_round = Round::nil(); // Nil round
686/// ```
687#[derive(Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
688pub struct Round(pub Option<u32>);
689
690impl Round {
691    /// Create a new round with the given round number.
692    pub fn new(round: u32) -> Self {
693        Self(Some(round))
694    }
695
696    /// Create a nil round.
697    pub fn nil() -> Self {
698        Self(None)
699    }
700
701    /// Get the round number as a u32, if it's not nil.
702    pub fn as_u32(&self) -> Option<u32> {
703        self.0
704    }
705}
706
707impl From<u32> for Round {
708    fn from(round: u32) -> Self {
709        Self::new(round)
710    }
711}
712
713impl Add<u32> for Round {
714    type Output = Self;
715
716    fn add(self, rhs: u32) -> Self::Output {
717        Self(self.0.map(|round| round + rhs))
718    }
719}
720
721impl Sub<u32> for Round {
722    type Output = Self;
723
724    fn sub(self, rhs: u32) -> Self::Output {
725        Self(self.0.map(|round| round - rhs))
726    }
727}
728
729impl Display for Round {
730    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
731        match self.0 {
732            Some(round) => write!(f, "{round}"),
733            None => write!(f, "Nil"),
734        }
735    }
736}
737
738impl Debug for Round {
739    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
740        write!(f, "{self}")
741    }
742}
743
744/// A proposal for a block value in a consensus round.
745///
746/// A proposal is created by the designated proposer for a given height and
747/// round. It contains the proposed block value along with additional metadata.
748#[derive(Clone, PartialEq, Eq, Serialize, Deserialize)]
749pub struct Proposal<V, A> {
750    /// The blockchain height
751    pub height: u64,
752    /// The consensus round number
753    pub round: Round,
754    /// The proposed consensus value
755    pub value: V,
756    /// The POL round for which the proposal is for
757    pub pol_round: Round,
758    /// The address of the proposer
759    pub proposer: A,
760}
761
762impl<V: Debug, A: Debug> std::fmt::Debug for Proposal<V, A> {
763    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
764        write!(
765            f,
766            "H:{} R:{} From:{:?} Val:{:?}",
767            self.height, self.round, self.proposer, self.value
768        )
769    }
770}
771
772/// The type of vote.
773#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
774pub enum VoteType {
775    /// A preliminary vote
776    Prevote,
777    /// A final vote that commits to a value
778    Precommit,
779}
780
781/// A vote for a value in a consensus round.
782///
783/// A vote is cast by a validator to indicate their agreement or disagreement
784/// with a proposed block value. The vote includes the validator's address, the
785/// round number, and the block value being voted on.
786#[derive(Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
787pub struct Vote<V, A> {
788    /// The type of vote (Prevote or Precommit)
789    pub r#type: VoteType,
790    /// The blockchain height
791    pub height: u64,
792    /// The consensus round number
793    pub round: Round,
794    /// The value being voted on (None for nil votes)
795    pub value: Option<V>,
796    /// The address of the validator casting the vote
797    pub validator_address: A,
798}
799
800impl<V: Debug, A: Debug> std::fmt::Debug for Vote<V, A> {
801    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
802        let val = match &self.value {
803            Some(val) => format!("{val:?}"),
804            None => "Nil".to_string(),
805        };
806        write!(
807            f,
808            "H:{} R:{} {:?} From:{:?} Val:{val}",
809            self.height, self.round, self.r#type, self.validator_address
810        )
811    }
812}
813
814impl<V, A> Vote<V, A> {
815    /// Check if the vote is nil.
816    ///
817    /// A nil vote is a vote that does not commit to a value.
818    pub fn is_nil(&self) -> bool {
819        self.value.is_none()
820    }
821}
822
823/// A fully validated, signed proposal ready to enter consensus.
824///
825/// This type wraps a proposal with a cryptographic signature to ensure
826/// authenticity and integrity.
827#[derive(Clone, Serialize, Deserialize)]
828pub struct SignedProposal<V, A> {
829    pub proposal: Proposal<V, A>,
830    pub signature: Signature,
831}
832
833/// A signed vote.
834///
835/// This type wraps a vote with a cryptographic signature to ensure
836/// authenticity and integrity.
837#[derive(Clone, Serialize, Deserialize)]
838pub struct SignedVote<V, A> {
839    pub vote: Vote<V, A>,
840    pub signature: Signature,
841}
842
843// Note: We intentionally ignore the signature as it's not used yet.
844impl<V: Debug, A: Debug> std::fmt::Debug for SignedProposal<V, A> {
845    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
846        write!(f, "{:?}", self.proposal)
847    }
848}
849
850// Note: We intentionally ignore the signature as it's not used yet.
851impl<V: Debug, A: Debug> std::fmt::Debug for SignedVote<V, A> {
852    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
853        write!(f, "{:?}", self.vote)
854    }
855}
856
857/// A public key for the consensus protocol.
858///
859/// This type is used to verify signatures on proposals and votes in the
860/// consensus protocol. Each validator has an associated public key that is used
861/// to authenticate their messages.
862pub type PublicKey = malachite_signing_ed25519::PublicKey;
863
864/// A validator's voting power.
865pub type VotingPower = u64;
866
867/// A validator in the consensus protocol.
868///
869/// Each validator has an associated address and public key to uniquely identify
870/// them. The voting power determines their weight in consensus decisions.
871#[derive(Clone, PartialEq, Eq, PartialOrd, Ord)]
872pub struct Validator<A> {
873    /// The validator's address
874    pub address: A,
875    /// The validator's public key for signature verification
876    pub public_key: PublicKey,
877    /// The validator's voting power (weight in consensus)
878    pub voting_power: VotingPower,
879}
880
881impl<A: Debug> std::fmt::Debug for Validator<A> {
882    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
883        write!(f, "{:?} ({})", self.address, self.voting_power)
884    }
885}
886
887impl<A> Validator<A> {
888    /// Create a new validator with the given address and public key.
889    ///
890    /// The voting power defaults to 1.
891    pub fn new(address: A, public_key: PublicKey) -> Self {
892        Self {
893            address,
894            public_key,
895            voting_power: 1,
896        }
897    }
898
899    /// Set the voting power for the validator.
900    ///
901    /// This method returns `self` for method chaining.
902    pub fn with_voting_power(mut self, voting_power: VotingPower) -> Self {
903        self.voting_power = voting_power;
904        self
905    }
906}
907
908/// A validator set represents a group of consensus participants.
909///
910/// The validator set defines who can participate in consensus at a given
911/// height. Each validator in the set has a voting power that determines their
912/// weight in consensus decisions.
913#[derive(Clone, Debug, PartialEq, Eq)]
914pub struct ValidatorSet<A> {
915    /// The list of validators in the set
916    pub validators: Vec<Validator<A>>,
917}
918
919impl<A: Clone + Ord> ValidatorSet<A> {
920    /// Create a new validator set with the given validators.
921    pub fn new(validators: impl IntoIterator<Item = Validator<A>>) -> Self {
922        // Ensure validators are unique by address.
923        let validators: BTreeMap<A, Validator<A>> = validators
924            .into_iter()
925            .map(|v| (v.address.clone(), v))
926            .collect();
927        assert!(!validators.is_empty());
928        let validators = validators.into_values().collect::<Vec<Validator<A>>>();
929        Self { validators }
930    }
931
932    /// Get the number of validators in the set.
933    pub fn count(&self) -> usize {
934        self.validators.len()
935    }
936}
937
938/// Commands that the application can send into the consensus engine.
939///
940/// These commands represent the primary interface for interacting with the
941/// consensus engine. They allow the application to start new heights,
942/// submit proposals, and process votes.
943pub enum ConsensusCommand<V, A> {
944    /// Start consensus at a given height with the validator set.
945    StartHeight(u64, ValidatorSet<A>),
946    /// A complete, locally validated and signed proposal that we create as the
947    /// proposer for the current round.
948    Propose(Proposal<V, A>),
949    /// A complete, locally validated and signed proposal that was received over
950    /// the network from another validator.
951    Proposal(SignedProposal<V, A>),
952    /// A signed vote received from the network.
953    Vote(SignedVote<V, A>),
954}
955
956impl<V, A> ConsensusCommand<V, A> {
957    /// Get the consensus height associated with the command.
958    pub fn height(&self) -> u64 {
959        match self {
960            ConsensusCommand::StartHeight(height, _) => *height,
961            ConsensusCommand::Propose(proposal) => proposal.height,
962            ConsensusCommand::Proposal(proposal) => proposal.proposal.height,
963            ConsensusCommand::Vote(vote) => vote.vote.height,
964        }
965    }
966}
967
968impl<V: Debug, A: Debug> std::fmt::Debug for ConsensusCommand<V, A> {
969    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
970        match self {
971            ConsensusCommand::StartHeight(height, validator_set) => write!(
972                f,
973                "StartHeight({}, [{}])",
974                height,
975                validator_set
976                    .validators
977                    .iter()
978                    .map(|v| format!("{:?}", v.address))
979                    .collect::<Vec<_>>()
980                    .join(", ")
981            ),
982            ConsensusCommand::Propose(proposal) => write!(f, "Propose({proposal:?})"),
983            ConsensusCommand::Proposal(proposal) => write!(f, "Proposal({proposal:?})"),
984            ConsensusCommand::Vote(vote) => write!(f, "Vote({vote:?})"),
985        }
986    }
987}
988
989/// A message to be gossiped to peers.
990///
991/// These messages represent network communication that needs to be sent to
992/// other validators in the network.
993#[derive(Clone, Debug)]
994pub enum NetworkMessage<V, A> {
995    /// A complete, locally validated and signed proposal ready to be gossiped.
996    Proposal(SignedProposal<V, A>),
997    /// A vote received from the network.
998    Vote(SignedVote<V, A>),
999}
1000
1001/// Events that the consensus engine emits for the application to handle.
1002///
1003/// These events represent the output of the consensus engine and tell the
1004/// application what actions it needs to take.
1005pub enum ConsensusEvent<V, A> {
1006    /// The consensus wants this message to be gossiped to peers.
1007    ///
1008    /// The application should send this message to all peers in the network.
1009    Gossip(NetworkMessage<V, A>),
1010    /// The consensus needs the app to build and inject a proposal.
1011    ///
1012    /// The application should create a proposal for the given height and round,
1013    /// then submit it to the consensus engine.
1014    RequestProposal { height: u64, round: u32 },
1015    /// The consensus has reached a decision and committed a block.
1016    ///
1017    /// This event indicates that consensus has been reached for the given
1018    /// height and the value should be committed to the blockchain.
1019    Decision { height: u64, round: u32, value: V },
1020    /// An internal error occurred in consensus.
1021    ///
1022    /// The application should handle this error appropriately, possibly by
1023    /// logging it or taking corrective action.
1024    Error(anyhow::Error),
1025}
1026
1027impl<V: Debug, A: Debug> std::fmt::Debug for ConsensusEvent<V, A> {
1028    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1029        match self {
1030            ConsensusEvent::Gossip(msg) => match msg {
1031                NetworkMessage::Proposal(proposal) => write!(f, "Gossip(Proposal({proposal:?}))"),
1032                NetworkMessage::Vote(vote) => write!(f, "Gossip(Vote({vote:?}))"),
1033            },
1034            ConsensusEvent::RequestProposal { height, round, .. } => {
1035                write!(f, "RequestProposal(H:{height} R:{round})")
1036            }
1037            ConsensusEvent::Decision {
1038                height,
1039                round,
1040                value,
1041            } => {
1042                write!(f, "Decision(H:{height} R: {round} Val:{value:?})")
1043            }
1044            ConsensusEvent::Error(error) => write!(f, "Error({error:?})"),
1045        }
1046    }
1047}
1048
1049/// A trait for retrieving the validator set at a specific blockchain height.
1050///
1051/// This trait allows consensus to dynamically determine the set of validators
1052/// that are eligible to participate in consensus at any given height.
1053///
1054/// This is useful for handling validator set changes across heights.
1055pub trait ValidatorSetProvider<A> {
1056    fn get_validator_set(&self, height: u64) -> Result<ValidatorSet<A>, anyhow::Error>;
1057}
1058
1059/// A trait for selecting the proposer for a given height and round.
1060///
1061/// This trait allows consumers to provide custom proposer selection logic
1062/// instead of using the default round-robin approach. This is useful for
1063/// implementing more sophisticated proposer selection algorithms like
1064/// VRF-based selection, weighted selection, or other custom logic.
1065///
1066/// ## Example Implementation
1067///
1068/// ```rust
1069/// use pathfinder_consensus::*;
1070///
1071/// struct RoundRobinProposerSelector;
1072///
1073/// impl<A: ValidatorAddress> ProposerSelector<A> for RoundRobinProposerSelector {
1074///     fn select_proposer<'a>(
1075///         &self,
1076///         validator_set: &'a ValidatorSet<A>,
1077///         height: u64,
1078///         round: u32,
1079///     ) -> &'a Validator<A> {
1080///         let index = round as usize % validator_set.count();
1081///         &validator_set.validators[index]
1082///     }
1083/// }
1084/// ```
1085pub trait ProposerSelector<A: ValidatorAddress>: Clone + Send + Sync {
1086    /// Select the proposer for the given height and round.
1087    ///
1088    /// ## Arguments
1089    ///
1090    /// - `validator_set`: The set of validators eligible to propose
1091    /// - `height`: The blockchain height
1092    /// - `round`: The consensus round number
1093    ///
1094    /// ## Returns
1095    ///
1096    /// Returns a reference to the selected validator who should propose
1097    /// for the given height and round.
1098    fn select_proposer<'a>(
1099        &self,
1100        validator_set: &'a ValidatorSet<A>,
1101        height: u64,
1102        round: u32,
1103    ) -> &'a Validator<A>;
1104}
1105
1106/// A default proposer selector that uses round-robin selection.
1107///
1108/// This is the default proposer selection algorithm that selects proposers
1109/// in a round-robin fashion based on the round number modulo the number of
1110/// validators.
1111#[derive(Clone, Default)]
1112pub struct RoundRobinProposerSelector;
1113
1114impl<A: ValidatorAddress> ProposerSelector<A> for RoundRobinProposerSelector {
1115    fn select_proposer<'a>(
1116        &self,
1117        validator_set: &'a ValidatorSet<A>,
1118        _height: u64,
1119        round: u32,
1120    ) -> &'a Validator<A> {
1121        let index = round as usize % validator_set.count();
1122        &validator_set.validators[index]
1123    }
1124}
1125
1126/// A type alias for consensus with the default round-robin proposer selector.
1127///
1128/// This provides a convenient way to create consensus instances without
1129/// specifying the proposer selector type.
1130pub type DefaultConsensus<V, A> = Consensus<V, A, RoundRobinProposerSelector>;
1131
1132/// A validator set provider that always returns the same validator set.
1133///
1134/// This is a simple implementation of `ValidatorSetProvider` that returns
1135/// the same validator set for all heights. This is useful for testing or
1136/// for applications where the validator set doesn't change.
1137pub struct StaticValidatorSetProvider<A> {
1138    validator_set: ValidatorSet<A>,
1139}
1140
1141impl<A> StaticValidatorSetProvider<A> {
1142    /// Create a new static validator set provider.
1143    ///
1144    /// ## Arguments
1145    ///
1146    /// - `validator_set`: The validator set to return for all heights
1147    pub fn new(validator_set: ValidatorSet<A>) -> Self {
1148        Self { validator_set }
1149    }
1150}
1151
1152impl<A: Clone + Send + Sync> ValidatorSetProvider<A> for StaticValidatorSetProvider<A> {
1153    fn get_validator_set(&self, _height: u64) -> Result<ValidatorSet<A>, anyhow::Error> {
1154        Ok(self.validator_set.clone())
1155    }
1156}
1157
1158#[cfg(test)]
1159mod tests {
1160    use super::*;
1161
1162    #[test]
1163    fn regression_validator_set_is_unique_by_address() {
1164        let with_duplicates = [1, 1, 2, 2, 2, 3, 3, 3, 3, 2, 1, 1, 2, 3, 2, 2, 1, 1, 3, 3]
1165            .into_iter()
1166            .map(|i| Validator::new(i, crate::PublicKey::from_bytes([0; 32])));
1167        let set = ValidatorSet::new(with_duplicates);
1168
1169        assert_eq!(
1170            set.validators.iter().map(|v| v.address).collect::<Vec<_>>(),
1171            vec![1, 2, 3]
1172        );
1173    }
1174}