Skip to main content

pathfinder_consensus/
lib.rs

1//! # Pathfinder Consensus
2//!
3//! A Byzantine Fault Tolerant (BFT) consensus engine for Starknet nodes.
4//!
5//! ## Overview
6//!
7//! This crate provides a consensus engine for Starknet nodes that wraps the
8//! Malachite implementation of the Tendermint BFT consensus algorithm. It's
9//! designed to be generic over validator addresses and consensus values, making
10//! it suitable for Starknet's consensus requirements.
11//!
12//! ## Core Concepts
13//!
14//! ### ValidatorAddress Trait
15//!
16//! Your validator address type must implement the `ValidatorAddress` trait,
17//! which requires:
18//! - `Sync + Send`: Thread-safe and sendable across threads
19//! - `Ord + Display + Debug + Default + Clone`: Standard Rust traits for
20//!   ordering, display, debugging, default values, and cloning
21//! - `Into<Vec<u8>>`: Convertible to bytes for serialization
22//! - `Serialize + DeserializeOwned`: Serde serialization support
23//!
24//! ### ValuePayload Trait
25//!
26//! Your consensus value type must implement the `ValuePayload` trait, which
27//! requires:
28//! - `Sync + Send`: Thread-safe and sendable across threads
29//! - `Ord + Display + Debug + Default + Clone`: Standard Rust traits
30//! - `Serialize + DeserializeOwned`: Serde serialization support
31//!
32//! ### Consensus Engine
33//!
34//! The main `Consensus<V, A>` struct is generic over:
35//! - `V`: Your consensus value type (must implement `ValuePayload`)
36//! - `A`: Your validator address type (must implement `ValidatorAddress`)
37//!
38//! ## Usage Example
39//!
40//! ```rust
41//! use pathfinder_consensus::*;
42//! use serde::{Deserialize, Serialize};
43//!
44//! // Define your validator address type
45//! #[derive(Clone, Debug, Default, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
46//! struct MyAddress(String);
47//!
48//! impl std::fmt::Display for MyAddress {
49//!     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
50//!         write!(f, "{}", self.0)
51//!     }
52//! }
53//!
54//! impl From<MyAddress> for Vec<u8> {
55//!     fn from(addr: MyAddress) -> Self {
56//!         addr.0.into_bytes()
57//!     }
58//! }
59//!
60//! // Define your consensus value type
61//! #[derive(Clone, Debug, Default, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
62//! struct BlockData(String);
63//!
64//! impl std::fmt::Display for BlockData {
65//!     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
66//!         write!(f, "{}", self.0)
67//!     }
68//! }
69//!
70//! // Custom proposer selector that uses weighted selection
71//! #[derive(Clone)]
72//! struct WeightedProposerSelector;
73//!
74//! impl ProposerSelector<MyAddress> for WeightedProposerSelector {
75//!     fn select_proposer<'a>(
76//!         &self,
77//!         validator_set: &'a ValidatorSet<MyAddress>,
78//!         _height: u64,
79//!         round: u32,
80//!     ) -> &'a Validator<MyAddress> {
81//!         // Simple weighted selection based on voting power
82//!         let total_power: u64 = validator_set
83//!             .validators
84//!             .iter()
85//!             .map(|v| v.voting_power)
86//!             .sum();
87//!         let selection = (round as u64) % total_power;
88//!
89//!         let mut cumulative = 0;
90//!         for validator in &validator_set.validators {
91//!             cumulative += validator.voting_power;
92//!             if selection < cumulative {
93//!                 return validator;
94//!             }
95//!         }
96//!
97//!         // Fallback to first validator
98//!         &validator_set.validators[0]
99//!     }
100//! }
101//!
102//! #[tokio::main]
103//! async fn main() {
104//!     // Create configuration
105//!     let my_address = MyAddress("validator_1".to_string());
106//!     let config = Config::new(my_address.clone());
107//!
108//!     // Create consensus engine with custom proposer selector
109//!     let proposer_selector = WeightedProposerSelector;
110//!     let mut consensus = Consensus::with_proposer_selector(config, proposer_selector);
111//!
112//!     // Or use the default round-robin selector (no additional configuration needed)
113//!     // let mut consensus = Consensus::new(config);
114//!
115//!     // Start consensus at height 1
116//!     let validator_set = ValidatorSet::new(vec![Validator::new(
117//!         my_address.clone(),
118//!         PublicKey::from_bytes([0; 32]),
119//!     )
120//!     .with_voting_power(10)]);
121//!
122//!     consensus.handle_command(ConsensusCommand::StartHeight(1, validator_set));
123//!
124//!     // Poll for events
125//!     while let Some(event) = consensus.next_event().await {
126//!         match event {
127//!             ConsensusEvent::RequestProposal { height, round } => {
128//!                 println!("Need to propose at height {}, round {}", height, round);
129//!             }
130//!             ConsensusEvent::Decision { height, value } => {
131//!                 println!("Consensus reached at height {}: {:?}", height, value);
132//!             }
133//!             ConsensusEvent::Gossip(message) => {
134//!                 println!("Need to gossip: {:?}", message);
135//!             }
136//!             ConsensusEvent::Error(error) => {
137//!                 eprintln!("Consensus error: {}", error);
138//!             }
139//!         }
140//!     }
141//! }
142//! ```
143//!
144//! ## Commands and Events
145//!
146//! The consensus engine operates on a command/event model:
147//!
148//! - **Commands**: Send commands to the consensus engine via `handle_command()`
149//! - **Events**: Poll for events from the consensus engine via
150//!   `next_event().await`
151//!
152//! ## Crash Recovery
153//!
154//! The consensus engine supports crash recovery through write-ahead logging:
155//!
156//! ```rust
157//! // Recover from a previous crash
158//! let validator_sets = Arc::new(StaticValidatorSetProvider::new(validator_set));
159//! let mut consensus = Consensus::recover(config, validator_sets);
160//! ```
161
162use std::collections::{BTreeMap, HashMap, VecDeque};
163use std::fmt::{Debug, Display};
164use std::ops::{Add, Sub};
165use std::sync::Arc;
166
167use serde::de::DeserializeOwned;
168use serde::{Deserialize, Serialize};
169
170// Re-export consensus types needed by the public API
171pub use crate::config::{Config, TimeoutValues};
172pub use crate::error::ConsensusError;
173use crate::internal::{InternalConsensus, InternalParams};
174use crate::wal::{delete_wal_file, FileWalSink, NoopWal, WalSink};
175
176mod config;
177mod error;
178mod internal;
179mod wal;
180
181/// A cryptographic signature for consensus messages.
182///
183/// This type is used to sign proposals and votes in the consensus protocol
184/// to ensure authenticity and integrity of consensus messages.
185pub type Signature = malachite_signing_ed25519::Signature;
186
187/// An Ed25519 signing key.
188///
189/// This is also called a secret key by other implementations.
190pub type SigningKey = ed25519_consensus::SigningKey;
191
192/// A trait for consensus validator addresses.
193///
194/// This trait defines the requirements for validator address types used in the
195/// consensus engine. Your validator address type must implement all the
196/// required traits to be compatible with the consensus engine.
197///
198/// ## Required Traits
199///
200/// - `Sync + Send`: Thread-safe and sendable across threads
201/// - `Ord + Display + Debug + Default + Clone`: Standard Rust traits for
202///   ordering, display, debugging, default values, and cloning
203/// - `Into<Vec<u8>>`: Convertible to bytes for serialization
204/// - `Serialize + DeserializeOwned`: Serde serialization support
205///
206/// ## Example Implementation
207///
208/// ```rust
209/// #[derive(Clone, Debug, Default, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
210/// struct MyAddress(String);
211///
212/// impl std::fmt::Display for MyAddress {
213///     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
214///         write!(f, "{}", self.0)
215///     }
216/// }
217///
218/// impl From<MyAddress> for Vec<u8> {
219///     fn from(addr: MyAddress) -> Self {
220///         addr.0.into_bytes()
221///     }
222/// }
223/// ```
224pub trait ValidatorAddress:
225    Sync + Send + Ord + Display + Debug + Default + Clone + Into<Vec<u8>> + Serialize + DeserializeOwned
226{
227}
228impl<T> ValidatorAddress for T where
229    T: Sync
230        + Send
231        + Ord
232        + Display
233        + Debug
234        + Default
235        + Clone
236        + Into<Vec<u8>>
237        + Serialize
238        + DeserializeOwned
239{
240}
241
242/// A trait for consensus value payloads.
243///
244/// This trait defines the requirements for consensus value types used in the
245/// consensus engine. Your consensus value type must implement all the required
246/// traits to be compatible with the consensus engine.
247///
248/// ## Required Traits
249///
250/// - `Sync + Send`: Thread-safe and sendable across threads
251/// - `Ord + Display + Debug + Default + Clone`: Standard Rust traits
252/// - `Serialize + DeserializeOwned`: Serde serialization support
253///
254/// ## Example Implementation
255///
256/// ```rust
257/// #[derive(Clone, Debug, Default, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
258/// struct BlockData(String);
259///
260/// impl std::fmt::Display for BlockData {
261///     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
262///         write!(f, "{}", self.0)
263///     }
264/// }
265/// ```
266pub trait ValuePayload:
267    Sync + Send + Ord + Display + Debug + Default + Clone + Serialize + DeserializeOwned
268{
269}
270impl<T> ValuePayload for T where
271    T: Sync + Send + Ord + Display + Debug + Default + Clone + Serialize + DeserializeOwned
272{
273}
274
275/// # Pathfinder consensus engine
276///
277/// This is the main consensus engine for Starknet nodes that implements
278/// Byzantine Fault Tolerant (BFT) consensus using the Malachite implementation
279/// of Tendermint. It's generic over validator addresses, consensus values, and
280/// proposer selection algorithms, making it suitable for Starknet's consensus
281/// requirements.
282///
283/// ## Generic Parameters
284///
285/// - `V`: Your consensus value type (must implement `ValuePayload`)
286/// - `A`: Your validator address type (must implement `ValidatorAddress`)
287/// - `P`: Your proposer selector type (must implement `ProposerSelector<A>`)
288///
289/// ## Usage
290///
291/// ```rust
292/// let config = Config::new(my_address);
293/// let mut consensus = Consensus::new(config);
294///
295/// // Start consensus at a height
296/// consensus.handle_command(ConsensusCommand::StartHeight(height, validator_set));
297///
298/// // Poll for events
299/// while let Some(event) = consensus.next_event().await {
300///     // Handle events
301/// }
302/// ```
303///
304/// ## Custom Proposer Selection
305///
306/// To use a custom proposer selector:
307///
308/// ```rust
309/// let config = Config::new(my_address);
310/// let custom_selector = WeightedProposerSelector;
311/// let mut consensus = Consensus::with_proposer_selector(config, custom_selector);
312/// ```
313///
314/// ## Crash Recovery
315///
316/// The consensus engine supports crash recovery through write-ahead logging:
317///
318/// ```rust
319/// let validator_sets = Arc::new(StaticValidatorSetProvider::new(validator_set));
320/// let mut consensus = Consensus::recover(config, validator_sets)?;
321/// ```
322pub struct Consensus<
323    V: ValuePayload + 'static,
324    A: ValidatorAddress + 'static,
325    P: ProposerSelector<A> + Send + Sync + 'static,
326> {
327    internal: HashMap<u64, InternalConsensus<V, A, P>>,
328    event_queue: VecDeque<ConsensusEvent<V, A>>,
329    config: Config<A>,
330    proposer_selector: P,
331    min_kept_height: Option<u64>,
332    last_decided_height: Option<u64>,
333}
334
335impl<
336        V: ValuePayload + 'static,
337        A: ValidatorAddress + 'static,
338        P: ProposerSelector<A> + Send + Sync + 'static,
339    > Consensus<V, A, P>
340{
341    /// Create a new consensus engine for the current validator.
342    ///
343    /// ## Arguments
344    ///
345    /// - `config`: The consensus configuration containing validator address,
346    ///   timeouts, and other settings
347    ///
348    /// ## Example
349    ///
350    /// ```rust
351    /// let config = Config::new(my_address);
352    /// let mut consensus = Consensus::new(config);
353    /// ```
354    pub fn new(config: Config<A>) -> DefaultConsensus<V, A> {
355        Consensus {
356            internal: HashMap::new(),
357            event_queue: VecDeque::new(),
358            config,
359            proposer_selector: RoundRobinProposerSelector,
360            min_kept_height: None,
361            last_decided_height: None,
362        }
363    }
364
365    /// Create a new consensus engine with a custom proposer selector for this
366    /// consensus engine.
367    ///
368    /// ## Arguments
369    ///
370    /// - `config`: The consensus configuration containing validator address,
371    ///   timeouts, and other settings
372    /// - `proposer_selector`: The proposer selection algorithm to use
373    ///
374    /// ## Example
375    ///
376    /// ```rust
377    /// let config = Config::new(my_address);
378    /// let custom_selector = WeightedProposerSelector;
379    /// let mut consensus = Consensus::with_proposer_selector(config, custom_selector);
380    /// ```
381    pub fn with_proposer_selector<PS: ProposerSelector<A> + Send + Sync + 'static>(
382        config: Config<A>,
383        proposer_selector: PS,
384    ) -> Consensus<V, A, PS> {
385        Consensus {
386            internal: HashMap::new(),
387            event_queue: VecDeque::new(),
388            config,
389            proposer_selector,
390            min_kept_height: None,
391            last_decided_height: None,
392        }
393    }
394
395    /// Recover recent heights from the write-ahead log.
396    ///
397    /// This method is used to recover consensus state after a crash or restart.
398    /// It reads the write-ahead log and reconstructs the consensus state for
399    /// all incomplete heights.
400    ///
401    /// ## Arguments
402    ///
403    /// - `config`: The consensus configuration
404    /// - `validator_sets`: A provider for validator sets at different heights
405    /// - `highest_committed`: The highest committed block in main storage
406    ///
407    /// ## Example
408    ///
409    /// ```rust
410    /// let validator_sets = Arc::new(StaticValidatorSetProvider::new(validator_set));
411    /// let mut consensus = Consensus::recover(config, validator_sets)?;
412    /// ```
413    pub fn recover<VS: ValidatorSetProvider<A> + 'static>(
414        config: Config<A>,
415        validator_sets: Arc<VS>,
416        highest_committed: Option<u64>,
417    ) -> anyhow::Result<DefaultConsensus<V, A>> {
418        Self::recover_inner(
419            Self::new(config.clone()),
420            config,
421            validator_sets,
422            highest_committed,
423        )
424    }
425
426    /// Recover recent heights from the write-ahead log with a custom proposer
427    /// selector for this consensus engine.
428    ///
429    /// This method is used to recover consensus state after a crash or restart.
430    /// It reads the write-ahead log and reconstructs the consensus state for
431    /// all incomplete heights.
432    ///
433    /// ## Arguments
434    ///
435    /// - `config`: The consensus configuration
436    /// - `validator_sets`: A provider for validator sets at different heights
437    /// - `proposer_selector`: The proposer selection algorithm to use
438    ///
439    /// ## Example
440    ///
441    /// ```rust
442    /// let validator_sets = Arc::new(StaticValidatorSetProvider::new(validator_set));
443    /// let mut consensus = Consensus::recover(config, validator_sets)?;
444    /// ```
445    pub fn recover_with_proposal_selector<
446        VS: ValidatorSetProvider<A> + 'static,
447        PS: ProposerSelector<A> + Send + Sync + 'static,
448    >(
449        config: Config<A>,
450        validator_sets: Arc<VS>,
451        proposer_selector: PS,
452        highest_committed: Option<u64>,
453    ) -> anyhow::Result<Consensus<V, A, PS>> {
454        Self::recover_inner(
455            Self::with_proposer_selector(config.clone(), proposer_selector),
456            config,
457            validator_sets,
458            highest_committed,
459        )
460    }
461
462    fn recover_inner<
463        VS: ValidatorSetProvider<A> + 'static,
464        PS: ProposerSelector<A> + Send + Sync + 'static,
465    >(
466        mut consensus: Consensus<V, A, PS>,
467        config: Config<A>,
468        validator_sets: Arc<VS>,
469        highest_committed: Option<u64>,
470    ) -> anyhow::Result<Consensus<V, A, PS>> {
471        use crate::wal::recovery;
472
473        tracing::info!(
474            validator = ?config.address,
475            wal_dir = %config.wal_dir.display(),
476            "Starting consensus recovery from WAL"
477        );
478
479        // Read the write-ahead log and recover all incomplete heights.
480        // This also returns finalized heights and the highest Decision height found
481        // (even in finalized heights).
482        let (incomplete_heights, finalized_heights, highest_decision) =
483            match recovery::recover_incomplete_heights(&config.wal_dir, highest_committed) {
484                Ok((incomplete, finalized, decision_height)) => {
485                    tracing::info!(
486                        validator = ?config.address,
487                        incomplete_heights = incomplete.len(),
488                        finalized_heights = finalized.len(),
489                        highest_decision = ?decision_height,
490                        "Found incomplete and finalized heights in WAL"
491                    );
492                    (incomplete, finalized, decision_height)
493                }
494                Err(e) => {
495                    tracing::error!(
496                        validator = ?config.address,
497                        wal_dir = %config.wal_dir.display(),
498                        error = %e,
499                        "Failed to recover incomplete heights from WAL"
500                    );
501                    (Vec::new(), Vec::new(), None)
502                }
503            };
504
505        // Set last_decided_height from the highest Decision found during recovery.
506        consensus.last_decided_height = highest_decision;
507        if let Some(h) = highest_decision {
508            tracing::info!(
509                validator = ?consensus.config.address,
510                last_decided_height = %h,
511                "Set last_decided_height from WAL recovery"
512            );
513        }
514
515        // Determine the maximum height we're recovering (incomplete or finalized).
516        let max_height = incomplete_heights
517            .iter()
518            .chain(finalized_heights.iter())
519            .map(|(height, _)| *height)
520            .max()
521            .or(highest_decision);
522
523        // Calculate the minimum height to keep based on history_depth.
524        // This matches the pruning logic used during normal operation.
525        let min_height_to_restore =
526            max_height.and_then(|max| max.checked_sub(config.history_depth));
527
528        // Restore finalized heights that are within history_depth.
529        // This ensures we can accept votes for these heights, matching the behavior
530        // during normal operation where finalized heights remain in memory until
531        // pruned.
532        for (height, entries) in finalized_heights {
533            // Only restore finalized heights that are within history_depth.
534            // (if max_height < history_depth, restore all finalized heights)
535            let should_restore = min_height_to_restore
536                .map(|min| height >= min)
537                .unwrap_or(true);
538
539            if should_restore {
540                tracing::info!(
541                    validator = ?consensus.config.address,
542                    height = %height,
543                    "Restoring finalized height within history_depth"
544                );
545
546                let validator_set = validator_sets.get_validator_set(height)?;
547                let mut internal_consensus = consensus.create_consensus(height, &validator_set);
548
549                // Recover from WAL first to restore the engine state.
550                let vote_round = internal_consensus.recover_from_wal(entries);
551                if let Some(round) = vote_round {
552                    // Schedule rebroadcast timeout.
553                    // See https://github.com/equilibriumco/pathfinder/issues/3286 for motivation.
554                    internal_consensus.schedule_rebroadcast(round);
555                }
556
557                // Only call StartHeight if the height is not already finalized.
558                if !internal_consensus.is_finalized() {
559                    internal_consensus
560                        .handle_command(ConsensusCommand::StartHeight(height, validator_set));
561                }
562
563                consensus.internal.insert(height, internal_consensus);
564            } else {
565                tracing::debug!(
566                    validator = ?consensus.config.address,
567                    height = %height,
568                    min_height = ?min_height_to_restore,
569                    "Skipping finalized height outside history_depth"
570                );
571            }
572        }
573
574        // Manually recover all incomplete heights.
575        for (height, entries) in incomplete_heights {
576            tracing::info!(
577                validator = ?consensus.config.address,
578                height = %height,
579                entry_count = entries.len(),
580                "Recovering incomplete height from WAL"
581            );
582
583            let validator_set = validator_sets.get_validator_set(height)?;
584            let mut internal_consensus = consensus.create_consensus(height, &validator_set);
585            internal_consensus.handle_command(ConsensusCommand::StartHeight(height, validator_set));
586            let vote_round = internal_consensus.recover_from_wal(entries);
587            if let Some(round) = vote_round {
588                // Schedule rebroadcast timeout.
589                internal_consensus.schedule_rebroadcast(round);
590            }
591            consensus.internal.insert(height, internal_consensus);
592        }
593
594        // Set min_kept_height to match what we've restored, so that is_height_finalized
595        // correctly identifies finalized heights that were pruned.
596        consensus.min_kept_height = min_height_to_restore;
597
598        tracing::info!(
599            validator = ?consensus.config.address,
600            recovered_heights = consensus.internal.len(),
601            last_decided_height = ?consensus.last_decided_height,
602            min_kept_height = ?consensus.min_kept_height,
603            "Completed consensus recovery"
604        );
605
606        Ok(consensus)
607    }
608
609    fn create_consensus(
610        &mut self,
611        height: u64,
612        validator_set: &ValidatorSet<A>,
613    ) -> InternalConsensus<V, A, P> {
614        let params = InternalParams {
615            height,
616            validator_set: validator_set.clone(),
617            address: self.config.address.clone(),
618            threshold_params: self.config.threshold_params,
619            value_payload: malachite_types::ValuePayload::ProposalOnly,
620        };
621
622        // Create a WAL for the height. If we fail, use a NoopWal.
623        let wal = match FileWalSink::new(&self.config.address, height, &self.config.wal_dir) {
624            Ok(wal) => Box::new(wal) as Box<dyn WalSink<V, A>>,
625            Err(e) => {
626                tracing::error!(
627                    validator = ?self.config.address,
628                    height = %height,
629                    error = %e,
630                    "Failed to create wal for height"
631                );
632                Box::new(NoopWal)
633            }
634        };
635
636        // A new consensus is created for every new height.
637        InternalConsensus::new(
638            params,
639            self.config.timeout_values.clone(),
640            wal,
641            self.proposer_selector.clone(),
642        )
643    }
644
645    /// Feed a command into the consensus engine.
646    ///
647    /// This method is the primary way to interact with the consensus engine.
648    /// Commands include starting new heights, submitting proposals, and
649    /// processing votes.
650    ///
651    /// ## Arguments
652    ///
653    /// - `cmd`: The command to process
654    ///
655    /// ## Example
656    ///
657    /// ```rust
658    /// // Start a new height
659    /// consensus.handle_command(ConsensusCommand::StartHeight(height, validator_set));
660    ///
661    /// // Submit a proposal
662    /// consensus.handle_command(ConsensusCommand::Proposal(signed_proposal));
663    ///
664    /// // Process a vote
665    /// consensus.handle_command(ConsensusCommand::Vote(signed_vote));
666    /// ```
667    pub fn handle_command(&mut self, cmd: ConsensusCommand<V, A>) {
668        match cmd {
669            // Start a new height.
670            ConsensusCommand::StartHeight(height, validator_set) => {
671                // A new consensus is created for every new height.
672                let mut consensus = self.create_consensus(height, &validator_set);
673                consensus.handle_command(ConsensusCommand::StartHeight(height, validator_set));
674                self.internal.insert(height, consensus);
675                tracing::debug!(
676                    validator = ?self.config.address,
677                    height = %height,
678                    "Started new consensus"
679                );
680            }
681            other => {
682                let height = other.height();
683                if let Some(engine) = self.internal.get_mut(&height) {
684                    engine.handle_command(other);
685                } else {
686                    tracing::warn!(
687                        validator = ?self.config.address,
688                        height = %height,
689                        command = ?other,
690                        "Received command for unknown height"
691                    );
692                }
693            }
694        }
695    }
696
697    /// Poll all engines for an event.
698    ///
699    /// This method should be called regularly to process events from the
700    /// consensus engine. Events include requests for proposals, decisions,
701    /// gossip messages, and errors.
702    ///
703    /// ## Returns
704    ///
705    /// Returns `Some(event)` if an event is available, or `None` if no events
706    /// are ready.
707    ///
708    /// ## Example
709    ///
710    /// ```rust
711    /// while let Some(event) = consensus.next_event().await {
712    ///     match event {
713    ///         ConsensusEvent::RequestProposal { height, round } => {
714    ///             // Build and submit a proposal
715    ///         }
716    ///         ConsensusEvent::Decision { height, value } => {
717    ///             // Consensus reached, process the value
718    ///         }
719    ///         ConsensusEvent::Gossip(message) => {
720    ///             // Send message to peers
721    ///         }
722    ///         ConsensusEvent::Error(error) => {
723    ///             // Handle error
724    ///         }
725    ///     }
726    /// }
727    /// ```
728    pub async fn next_event(&mut self) -> Option<ConsensusEvent<V, A>> {
729        let mut finished_heights = Vec::new();
730        // Drain each internal engine.
731        for (height, engine) in self.internal.iter_mut() {
732            if let Some(event) = engine.poll_internal().await {
733                tracing::trace!(
734                    validator = ?self.config.address,
735                    height = %height,
736                    event = ?event,
737                    "Engine returned event"
738                );
739                // Track finished heights and update last_decided_height.
740                if let ConsensusEvent::Decision { height, .. } = &event {
741                    finished_heights.push(*height);
742                    // Update last_decided_height to track the highest decided height.
743                    self.last_decided_height = Some(
744                        self.last_decided_height
745                            .map(|h| h.max(*height))
746                            .unwrap_or(*height),
747                    );
748                }
749                // Push the event to the queue.
750                self.event_queue.push_back(event);
751            }
752        }
753
754        // Prune old engines if we have any finished heights.
755        if !finished_heights.is_empty() {
756            self.prune_old_engines();
757        }
758
759        // Return the first event from the queue.
760        self.event_queue.pop_front()
761    }
762
763    /// Prune old engines from the internal map.
764    fn prune_old_engines(&mut self) {
765        let max_height = self.internal.keys().max().copied();
766        if let Some(max_height) = max_height {
767            let new_min_height = max_height.checked_sub(self.config.history_depth);
768
769            if let Some(new_min) = new_min_height {
770                // Collect heights that will be pruned (before we remove them from the map).
771                let pruned_heights: Vec<u64> = self
772                    .internal
773                    .keys()
774                    .filter(|height| **height < new_min)
775                    .copied()
776                    .collect();
777
778                // Prune the internal map and set the new min_kept_height.
779                self.min_kept_height = Some(new_min);
780                self.internal.retain(|height, _| *height >= new_min);
781
782                // Delete WAL files for pruned heights.
783                for height in &pruned_heights {
784                    if let Err(e) =
785                        delete_wal_file(&self.config.address, *height, &self.config.wal_dir)
786                    {
787                        tracing::warn!(
788                            validator = ?self.config.address,
789                            height = %height,
790                            error = %e,
791                            "Failed to delete WAL file for pruned height"
792                        );
793                    }
794                }
795
796                tracing::debug!(
797                    validator = ?self.config.address,
798                    min_height = %new_min,
799                    max_height = %max_height,
800                    pruned_count = pruned_heights.len(),
801                    "Pruned old consensus engines and deleted WAL files"
802                );
803            }
804        }
805    }
806
807    /// Check if a specific height has been finalized (i.e., a decision has been
808    /// reached)
809    ///
810    /// ## Arguments
811    ///
812    /// - `height`: The height to check
813    ///
814    /// ## Returns
815    ///
816    /// Returns `true` if the height has been finalized, `false` otherwise.
817    ///
818    /// ## Example
819    ///
820    /// ```rust
821    /// if consensus.is_height_finalized(height) {
822    ///     println!("Height {} has been finalized", height);
823    /// }
824    /// ```
825    pub fn is_height_finalized(&self, height: u64) -> bool {
826        if let Some(engine) = self.internal.get(&height) {
827            engine.is_finalized()
828        } else {
829            // If the height is not in our internal map, it might have been pruned
830            // after being finalized, so we assume it's finalized
831            if let Some(min_height) = self.min_kept_height {
832                if height < min_height {
833                    return true;
834                }
835            }
836            false
837        }
838    }
839
840    /// Check if a specific height is actively tracked by the consensus engine.
841    ///
842    /// ## Arguments
843    ///
844    /// - `height`: The height to check
845    ///
846    /// ## Returns
847    ///
848    /// Returns `true` if the height is active, `false` otherwise.
849    pub fn is_height_active(&self, height: u64) -> bool {
850        self.internal.contains_key(&height)
851    }
852
853    /// Get the maximum height actively being tracked by the consensus engine.
854    ///
855    /// This returns the highest height that consensus is currently working on,
856    /// which includes incomplete heights that haven't reached a decision yet.
857    /// Returns `None` if there are no actively tracked heights.
858    pub fn max_active_height(&self) -> Option<u64> {
859        self.internal.keys().max().copied()
860    }
861
862    /// Get the highest height that consensus has decided on.
863    ///
864    /// This returns the highest height that has a Decision entry, even if that
865    /// height is no longer actively tracked (e.g., after recovery when it was
866    /// skipped).
867    ///
868    /// Returns `None` if no decisions have been made yet.
869    pub fn last_decided_height(&self) -> Option<u64> {
870        self.last_decided_height
871    }
872}
873
874/// A round number (or `None` if the round is nil).
875///
876/// This type represents a consensus round number. A round can be either a
877/// specific round number or nil (None), which represents a special state in the
878/// consensus protocol.
879///
880/// ## Example
881///
882/// ```rust
883/// let round = Round::new(5); // Round 5
884/// let nil_round = Round::nil(); // Nil round
885/// ```
886#[derive(Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
887pub struct Round(pub Option<u32>);
888
889impl Round {
890    /// Create a new round with the given round number.
891    pub fn new(round: u32) -> Self {
892        Self(Some(round))
893    }
894
895    /// Create a nil round.
896    pub fn nil() -> Self {
897        Self(None)
898    }
899
900    /// Get the round number as a u32, if it's not nil.
901    pub fn as_u32(&self) -> Option<u32> {
902        self.0
903    }
904}
905
906impl From<u32> for Round {
907    fn from(round: u32) -> Self {
908        Self::new(round)
909    }
910}
911
912impl Add<u32> for Round {
913    type Output = Self;
914
915    fn add(self, rhs: u32) -> Self::Output {
916        Self(self.0.map(|round| round + rhs))
917    }
918}
919
920impl Sub<u32> for Round {
921    type Output = Self;
922
923    fn sub(self, rhs: u32) -> Self::Output {
924        Self(self.0.map(|round| round - rhs))
925    }
926}
927
928impl Display for Round {
929    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
930        match self.0 {
931            Some(round) => write!(f, "{round}"),
932            None => write!(f, "Nil"),
933        }
934    }
935}
936
937impl Debug for Round {
938    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
939        write!(f, "{self}")
940    }
941}
942
943/// A proposal for a block value in a consensus round.
944///
945/// A proposal is created by the designated proposer for a given height and
946/// round. It contains the proposed block value along with additional metadata.
947#[derive(Clone, PartialEq, Eq, Serialize, Deserialize)]
948pub struct Proposal<V, A> {
949    /// The blockchain height
950    pub height: u64,
951    /// The consensus round number
952    pub round: Round,
953    /// The proposed consensus value
954    pub value: V,
955    /// The POL round for which the proposal is for
956    pub pol_round: Round,
957    /// The address of the proposer
958    pub proposer: A,
959}
960
961impl<V: Debug, A: Debug> std::fmt::Debug for Proposal<V, A> {
962    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
963        write!(
964            f,
965            "H:{} R:{} From:{:?} Val:{:?}",
966            self.height, self.round, self.proposer, self.value
967        )
968    }
969}
970
971/// The type of vote.
972#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
973pub enum VoteType {
974    /// A preliminary vote
975    Prevote,
976    /// A final vote that commits to a value
977    Precommit,
978}
979
980/// A vote for a value in a consensus round.
981///
982/// A vote is cast by a validator to indicate their agreement or disagreement
983/// with a proposed block value. The vote includes the validator's address, the
984/// round number, and the block value being voted on.
985#[derive(Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
986pub struct Vote<V, A> {
987    /// The type of vote (Prevote or Precommit)
988    pub r#type: VoteType,
989    /// The blockchain height
990    pub height: u64,
991    /// The consensus round number
992    pub round: Round,
993    /// The value being voted on (None for nil votes)
994    pub value: Option<V>,
995    /// The address of the validator casting the vote
996    pub validator_address: A,
997}
998
999impl<V: Debug, A: Debug> std::fmt::Debug for Vote<V, A> {
1000    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1001        let val = match &self.value {
1002            Some(val) => format!("{val:?}"),
1003            None => "Nil".to_string(),
1004        };
1005        write!(
1006            f,
1007            "H:{} R:{} {:?} From:{:?} Val:{val}",
1008            self.height, self.round, self.r#type, self.validator_address
1009        )
1010    }
1011}
1012
1013impl<V, A> Vote<V, A> {
1014    /// Check if the vote is nil.
1015    ///
1016    /// A nil vote is a vote that does not commit to a value.
1017    pub fn is_nil(&self) -> bool {
1018        self.value.is_none()
1019    }
1020}
1021
1022/// A fully validated, signed proposal ready to enter consensus.
1023///
1024/// This type wraps a proposal with a cryptographic signature to ensure
1025/// authenticity and integrity.
1026#[derive(Clone, Serialize, Deserialize)]
1027pub struct SignedProposal<V, A> {
1028    pub proposal: Proposal<V, A>,
1029    pub signature: Signature,
1030}
1031
1032/// A signed vote.
1033///
1034/// This type wraps a vote with a cryptographic signature to ensure
1035/// authenticity and integrity.
1036#[derive(Clone, Serialize, Deserialize)]
1037pub struct SignedVote<V, A> {
1038    pub vote: Vote<V, A>,
1039    pub signature: Signature,
1040}
1041
1042// Note: We intentionally ignore the signature as it's not used yet.
1043impl<V: Debug, A: Debug> std::fmt::Debug for SignedProposal<V, A> {
1044    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1045        write!(f, "{:?}", self.proposal)
1046    }
1047}
1048
1049// Note: We intentionally ignore the signature as it's not used yet.
1050impl<V: Debug, A: Debug> std::fmt::Debug for SignedVote<V, A> {
1051    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1052        write!(f, "{:?}", self.vote)
1053    }
1054}
1055
1056/// A public key for the consensus protocol.
1057///
1058/// This type is used to verify signatures on proposals and votes in the
1059/// consensus protocol. Each validator has an associated public key that is used
1060/// to authenticate their messages.
1061pub type PublicKey = malachite_signing_ed25519::PublicKey;
1062
1063/// A validator's voting power.
1064pub type VotingPower = u64;
1065
1066/// A validator in the consensus protocol.
1067///
1068/// Each validator has an associated address and public key to uniquely identify
1069/// them. The voting power determines their weight in consensus decisions.
1070#[derive(Clone, PartialEq, Eq, PartialOrd, Ord)]
1071pub struct Validator<A> {
1072    /// The validator's address
1073    pub address: A,
1074    /// The validator's public key for signature verification
1075    pub public_key: PublicKey,
1076    /// The validator's voting power (weight in consensus)
1077    pub voting_power: VotingPower,
1078}
1079
1080impl<A: Debug> std::fmt::Debug for Validator<A> {
1081    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1082        write!(f, "{:?} ({})", self.address, self.voting_power)
1083    }
1084}
1085
1086impl<A> Validator<A> {
1087    /// Create a new validator with the given address and public key.
1088    ///
1089    /// The voting power defaults to 1.
1090    pub fn new(address: A, public_key: PublicKey) -> Self {
1091        Self {
1092            address,
1093            public_key,
1094            voting_power: 1,
1095        }
1096    }
1097
1098    /// Set the voting power for the validator.
1099    ///
1100    /// This method returns `self` for method chaining.
1101    pub fn with_voting_power(mut self, voting_power: VotingPower) -> Self {
1102        self.voting_power = voting_power;
1103        self
1104    }
1105}
1106
1107/// A validator set represents a group of consensus participants.
1108///
1109/// The validator set defines who can participate in consensus at a given
1110/// height. Each validator in the set has a voting power that determines their
1111/// weight in consensus decisions.
1112#[derive(Clone, Debug, PartialEq, Eq)]
1113pub struct ValidatorSet<A> {
1114    /// The list of validators in the set
1115    pub validators: Vec<Validator<A>>,
1116}
1117
1118impl<A: Clone + Ord> ValidatorSet<A> {
1119    /// Create a new validator set with the given validators.
1120    pub fn new(validators: impl IntoIterator<Item = Validator<A>>) -> Self {
1121        // Ensure validators are unique by address.
1122        let validators: BTreeMap<A, Validator<A>> = validators
1123            .into_iter()
1124            .map(|v| (v.address.clone(), v))
1125            .collect();
1126        assert!(!validators.is_empty());
1127        let validators = validators.into_values().collect::<Vec<Validator<A>>>();
1128        Self { validators }
1129    }
1130
1131    /// Get the number of validators in the set.
1132    pub fn count(&self) -> usize {
1133        self.validators.len()
1134    }
1135}
1136
1137/// Commands that the application can send into the consensus engine.
1138///
1139/// These commands represent the primary interface for interacting with the
1140/// consensus engine. They allow the application to start new heights,
1141/// submit proposals, and process votes.
1142pub enum ConsensusCommand<V, A> {
1143    /// Start consensus at a given height with the validator set.
1144    StartHeight(u64, ValidatorSet<A>),
1145    /// A complete, locally validated and signed proposal that we create as the
1146    /// proposer for the current round.
1147    Propose(Proposal<V, A>),
1148    /// A complete, locally validated and signed proposal that was received over
1149    /// the network from another validator.
1150    Proposal(SignedProposal<V, A>),
1151    /// A signed vote received from the network.
1152    Vote(SignedVote<V, A>),
1153}
1154
1155impl<V, A> ConsensusCommand<V, A> {
1156    /// Get the consensus height associated with the command.
1157    pub fn height(&self) -> u64 {
1158        match self {
1159            ConsensusCommand::StartHeight(height, _) => *height,
1160            ConsensusCommand::Propose(proposal) => proposal.height,
1161            ConsensusCommand::Proposal(proposal) => proposal.proposal.height,
1162            ConsensusCommand::Vote(vote) => vote.vote.height,
1163        }
1164    }
1165}
1166
1167impl<V: Debug, A: Debug> std::fmt::Debug for ConsensusCommand<V, A> {
1168    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1169        match self {
1170            ConsensusCommand::StartHeight(height, validator_set) => write!(
1171                f,
1172                "StartHeight({}, [{}])",
1173                height,
1174                validator_set
1175                    .validators
1176                    .iter()
1177                    .map(|v| format!("{:?}", v.address))
1178                    .collect::<Vec<_>>()
1179                    .join(", ")
1180            ),
1181            ConsensusCommand::Propose(proposal) => write!(f, "Propose({proposal:?})"),
1182            ConsensusCommand::Proposal(proposal) => write!(f, "Proposal({proposal:?})"),
1183            ConsensusCommand::Vote(vote) => write!(f, "Vote({vote:?})"),
1184        }
1185    }
1186}
1187
1188/// A message to be gossiped to peers.
1189///
1190/// These messages represent network communication that needs to be sent to
1191/// other validators in the network.
1192#[derive(Clone, Debug)]
1193pub enum NetworkMessage<V, A> {
1194    /// A complete, locally validated and signed proposal ready to be gossiped.
1195    Proposal(SignedProposal<V, A>),
1196    /// A vote received from the network.
1197    Vote(SignedVote<V, A>),
1198}
1199
1200/// Events that the consensus engine emits for the application to handle.
1201///
1202/// These events represent the output of the consensus engine and tell the
1203/// application what actions it needs to take.
1204pub enum ConsensusEvent<V, A> {
1205    /// The consensus wants this message to be gossiped to peers.
1206    ///
1207    /// The application should send this message to all peers in the network.
1208    Gossip(NetworkMessage<V, A>),
1209    /// The consensus needs the app to build and inject a proposal.
1210    ///
1211    /// The application should create a proposal for the given height and round,
1212    /// then submit it to the consensus engine.
1213    RequestProposal { height: u64, round: u32 },
1214    /// The consensus has reached a decision and committed a block.
1215    ///
1216    /// This event indicates that consensus has been reached for the given
1217    /// height and the value should be committed to the blockchain.
1218    Decision { height: u64, round: u32, value: V },
1219    /// An internal error occurred in consensus.
1220    ///
1221    /// The application should handle this error appropriately, possibly by
1222    /// logging it or taking corrective action.
1223    Error(ConsensusError),
1224}
1225
1226impl<V: Debug, A: Debug> std::fmt::Debug for ConsensusEvent<V, A> {
1227    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1228        match self {
1229            ConsensusEvent::Gossip(msg) => match msg {
1230                NetworkMessage::Proposal(proposal) => write!(f, "Gossip(Proposal({proposal:?}))"),
1231                NetworkMessage::Vote(vote) => write!(f, "Gossip(Vote({vote:?}))"),
1232            },
1233            ConsensusEvent::RequestProposal { height, round, .. } => {
1234                write!(f, "RequestProposal(H:{height} R:{round})")
1235            }
1236            ConsensusEvent::Decision {
1237                height,
1238                round,
1239                value,
1240            } => {
1241                write!(f, "Decision(H:{height} R: {round} Val:{value:?})")
1242            }
1243            ConsensusEvent::Error(error) => write!(f, "Error({error:?})"),
1244        }
1245    }
1246}
1247
1248/// A trait for retrieving the validator set at a specific blockchain height.
1249///
1250/// This trait allows consensus to dynamically determine the set of validators
1251/// that are eligible to participate in consensus at any given height.
1252///
1253/// This is useful for handling validator set changes across heights.
1254pub trait ValidatorSetProvider<A> {
1255    fn get_validator_set(&self, height: u64) -> Result<ValidatorSet<A>, anyhow::Error>;
1256}
1257
1258/// A trait for selecting the proposer for a given height and round.
1259///
1260/// This trait allows consumers to provide custom proposer selection logic
1261/// instead of using the default round-robin approach. This is useful for
1262/// implementing more sophisticated proposer selection algorithms like
1263/// VRF-based selection, weighted selection, or other custom logic.
1264///
1265/// ## Example Implementation
1266///
1267/// ```rust
1268/// use pathfinder_consensus::*;
1269///
1270/// struct RoundRobinProposerSelector;
1271///
1272/// impl<A: ValidatorAddress> ProposerSelector<A> for RoundRobinProposerSelector {
1273///     fn select_proposer<'a>(
1274///         &self,
1275///         validator_set: &'a ValidatorSet<A>,
1276///         height: u64,
1277///         round: u32,
1278///     ) -> &'a Validator<A> {
1279///         let index = round as usize % validator_set.count();
1280///         &validator_set.validators[index]
1281///     }
1282/// }
1283/// ```
1284pub trait ProposerSelector<A: ValidatorAddress>: Clone + Send + Sync {
1285    /// Select the proposer for the given height and round.
1286    ///
1287    /// ## Arguments
1288    ///
1289    /// - `validator_set`: The set of validators eligible to propose
1290    /// - `height`: The blockchain height
1291    /// - `round`: The consensus round number
1292    ///
1293    /// ## Returns
1294    ///
1295    /// Returns a reference to the selected validator who should propose
1296    /// for the given height and round.
1297    fn select_proposer<'a>(
1298        &self,
1299        validator_set: &'a ValidatorSet<A>,
1300        height: u64,
1301        round: u32,
1302    ) -> &'a Validator<A>;
1303}
1304
1305/// A default proposer selector that uses round-robin selection.
1306///
1307/// This is the default proposer selection algorithm that selects proposers
1308/// in a round-robin fashion based on the round number modulo the number of
1309/// validators.
1310#[derive(Clone, Default)]
1311pub struct RoundRobinProposerSelector;
1312
1313impl<A: ValidatorAddress> ProposerSelector<A> for RoundRobinProposerSelector {
1314    fn select_proposer<'a>(
1315        &self,
1316        validator_set: &'a ValidatorSet<A>,
1317        _height: u64,
1318        round: u32,
1319    ) -> &'a Validator<A> {
1320        let index = round as usize % validator_set.count();
1321        &validator_set.validators[index]
1322    }
1323}
1324
1325/// A type alias for consensus with the default round-robin proposer selector.
1326///
1327/// This provides a convenient way to create consensus instances without
1328/// specifying the proposer selector type.
1329pub type DefaultConsensus<V, A> = Consensus<V, A, RoundRobinProposerSelector>;
1330
1331/// A validator set provider that always returns the same validator set.
1332///
1333/// This is a simple implementation of `ValidatorSetProvider` that returns
1334/// the same validator set for all heights. This is useful for testing or
1335/// for applications where the validator set doesn't change.
1336pub struct StaticValidatorSetProvider<A> {
1337    validator_set: ValidatorSet<A>,
1338}
1339
1340impl<A> StaticValidatorSetProvider<A> {
1341    /// Create a new static validator set provider.
1342    ///
1343    /// ## Arguments
1344    ///
1345    /// - `validator_set`: The validator set to return for all heights
1346    pub fn new(validator_set: ValidatorSet<A>) -> Self {
1347        Self { validator_set }
1348    }
1349}
1350
1351impl<A: Clone + Send + Sync> ValidatorSetProvider<A> for StaticValidatorSetProvider<A> {
1352    fn get_validator_set(&self, _height: u64) -> Result<ValidatorSet<A>, anyhow::Error> {
1353        Ok(self.validator_set.clone())
1354    }
1355}
1356
1357#[cfg(test)]
1358mod tests {
1359    use super::*;
1360
1361    #[test]
1362    fn regression_validator_set_is_unique_by_address() {
1363        let with_duplicates = [1, 1, 2, 2, 2, 3, 3, 3, 3, 2, 1, 1, 2, 3, 2, 2, 1, 1, 3, 3]
1364            .into_iter()
1365            .map(|i| Validator::new(i, crate::PublicKey::from_bytes([0; 32])));
1366        let set = ValidatorSet::new(with_duplicates);
1367
1368        assert_eq!(
1369            set.validators.iter().map(|v| v.address).collect::<Vec<_>>(),
1370            vec![1, 2, 3]
1371        );
1372    }
1373}