pathfinder_consensus/lib.rs
1//! # Pathfinder Consensus
2//!
3//! A Byzantine Fault Tolerant (BFT) consensus engine for Starknet nodes.
4//!
5//! ## Overview
6//!
7//! This crate provides a consensus engine for Starknet nodes that wraps the
8//! Malachite implementation of the Tendermint BFT consensus algorithm. It's
9//! designed to be generic over validator addresses and consensus values, making
10//! it suitable for Starknet's consensus requirements.
11//!
12//! ## Core Concepts
13//!
14//! ### ValidatorAddress Trait
15//!
16//! Your validator address type must implement the `ValidatorAddress` trait,
17//! which requires:
18//! - `Sync + Send`: Thread-safe and sendable across threads
19//! - `Ord + Display + Debug + Default + Clone`: Standard Rust traits for
20//! ordering, display, debugging, default values, and cloning
21//! - `Into<Vec<u8>>`: Convertible to bytes for serialization
22//! - `Serialize + DeserializeOwned`: Serde serialization support
23//!
24//! ### ValuePayload Trait
25//!
26//! Your consensus value type must implement the `ValuePayload` trait, which
27//! requires:
28//! - `Sync + Send`: Thread-safe and sendable across threads
29//! - `Ord + Display + Debug + Default + Clone`: Standard Rust traits
30//! - `Serialize + DeserializeOwned`: Serde serialization support
31//!
32//! ### Consensus Engine
33//!
34//! The main `Consensus<V, A>` struct is generic over:
35//! - `V`: Your consensus value type (must implement `ValuePayload`)
36//! - `A`: Your validator address type (must implement `ValidatorAddress`)
37//!
38//! ## Usage Example
39//!
40//! ```rust
41//! use pathfinder_consensus::*;
42//! use serde::{Deserialize, Serialize};
43//!
44//! // Define your validator address type
45//! #[derive(Clone, Debug, Default, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
46//! struct MyAddress(String);
47//!
48//! impl std::fmt::Display for MyAddress {
49//! fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
50//! write!(f, "{}", self.0)
51//! }
52//! }
53//!
54//! impl From<MyAddress> for Vec<u8> {
55//! fn from(addr: MyAddress) -> Self {
56//! addr.0.into_bytes()
57//! }
58//! }
59//!
60//! // Define your consensus value type
61//! #[derive(Clone, Debug, Default, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
62//! struct BlockData(String);
63//!
64//! impl std::fmt::Display for BlockData {
65//! fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
66//! write!(f, "{}", self.0)
67//! }
68//! }
69//!
70//! // Custom proposer selector that uses weighted selection
71//! #[derive(Clone)]
72//! struct WeightedProposerSelector;
73//!
74//! impl ProposerSelector<MyAddress> for WeightedProposerSelector {
75//! fn select_proposer<'a>(
76//! &self,
77//! validator_set: &'a ValidatorSet<MyAddress>,
78//! _height: u64,
79//! round: u32,
80//! ) -> &'a Validator<MyAddress> {
81//! // Simple weighted selection based on voting power
82//! let total_power: u64 = validator_set
83//! .validators
84//! .iter()
85//! .map(|v| v.voting_power)
86//! .sum();
87//! let selection = (round as u64) % total_power;
88//!
89//! let mut cumulative = 0;
90//! for validator in &validator_set.validators {
91//! cumulative += validator.voting_power;
92//! if selection < cumulative {
93//! return validator;
94//! }
95//! }
96//!
97//! // Fallback to first validator
98//! &validator_set.validators[0]
99//! }
100//! }
101//!
102//! #[tokio::main]
103//! async fn main() {
104//! // Create configuration
105//! let my_address = MyAddress("validator_1".to_string());
106//! let config = Config::new(my_address.clone());
107//!
108//! // Create consensus engine with custom proposer selector
109//! let proposer_selector = WeightedProposerSelector;
110//! let mut consensus = Consensus::with_proposer_selector(config, proposer_selector);
111//!
112//! // Or use the default round-robin selector (no additional configuration needed)
113//! // let mut consensus = Consensus::new(config);
114//!
115//! // Start consensus at height 1
116//! let validator_set = ValidatorSet::new(vec![Validator::new(
117//! my_address.clone(),
118//! PublicKey::from_bytes([0; 32]),
119//! )
120//! .with_voting_power(10)]);
121//!
122//! consensus.handle_command(ConsensusCommand::StartHeight(1, validator_set));
123//!
124//! // Poll for events
125//! while let Some(event) = consensus.next_event().await {
126//! match event {
127//! ConsensusEvent::RequestProposal { height, round } => {
128//! println!("Need to propose at height {}, round {}", height, round);
129//! }
130//! ConsensusEvent::Decision { height, value } => {
131//! println!("Consensus reached at height {}: {:?}", height, value);
132//! }
133//! ConsensusEvent::Gossip(message) => {
134//! println!("Need to gossip: {:?}", message);
135//! }
136//! ConsensusEvent::Error(error) => {
137//! eprintln!("Consensus error: {}", error);
138//! }
139//! }
140//! }
141//! }
142//! ```
143//!
144//! ## Commands and Events
145//!
146//! The consensus engine operates on a command/event model:
147//!
148//! - **Commands**: Send commands to the consensus engine via `handle_command()`
149//! - **Events**: Poll for events from the consensus engine via
150//! `next_event().await`
151//!
152//! ## Crash Recovery
153//!
154//! The consensus engine supports crash recovery through write-ahead logging:
155//!
156//! ```rust
157//! // Recover from a previous crash
158//! let validator_sets = Arc::new(StaticValidatorSetProvider::new(validator_set));
159//! let mut consensus = Consensus::recover(config, validator_sets);
160//! ```
161
162use std::collections::{BTreeMap, HashMap, VecDeque};
163use std::fmt::{Debug, Display};
164use std::ops::{Add, Sub};
165use std::sync::Arc;
166
167use serde::de::DeserializeOwned;
168use serde::{Deserialize, Serialize};
169
170// Re-export consensus types needed by the public API
171pub use crate::config::{Config, TimeoutValues};
172pub use crate::error::ConsensusError;
173use crate::internal::{InternalConsensus, InternalParams};
174use crate::wal::{delete_wal_file, FileWalSink, NoopWal, WalSink};
175
176mod config;
177mod error;
178mod internal;
179mod wal;
180
181/// A cryptographic signature for consensus messages.
182///
183/// This type is used to sign proposals and votes in the consensus protocol
184/// to ensure authenticity and integrity of consensus messages.
185pub type Signature = malachite_signing_ed25519::Signature;
186
187/// An Ed25519 signing key.
188///
189/// This is also called a secret key by other implementations.
190pub type SigningKey = ed25519_consensus::SigningKey;
191
192/// A trait for consensus validator addresses.
193///
194/// This trait defines the requirements for validator address types used in the
195/// consensus engine. Your validator address type must implement all the
196/// required traits to be compatible with the consensus engine.
197///
198/// ## Required Traits
199///
200/// - `Sync + Send`: Thread-safe and sendable across threads
201/// - `Ord + Display + Debug + Default + Clone`: Standard Rust traits for
202/// ordering, display, debugging, default values, and cloning
203/// - `Into<Vec<u8>>`: Convertible to bytes for serialization
204/// - `Serialize + DeserializeOwned`: Serde serialization support
205///
206/// ## Example Implementation
207///
208/// ```rust
209/// #[derive(Clone, Debug, Default, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
210/// struct MyAddress(String);
211///
212/// impl std::fmt::Display for MyAddress {
213/// fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
214/// write!(f, "{}", self.0)
215/// }
216/// }
217///
218/// impl From<MyAddress> for Vec<u8> {
219/// fn from(addr: MyAddress) -> Self {
220/// addr.0.into_bytes()
221/// }
222/// }
223/// ```
224pub trait ValidatorAddress:
225 Sync + Send + Ord + Display + Debug + Default + Clone + Into<Vec<u8>> + Serialize + DeserializeOwned
226{
227}
228impl<T> ValidatorAddress for T where
229 T: Sync
230 + Send
231 + Ord
232 + Display
233 + Debug
234 + Default
235 + Clone
236 + Into<Vec<u8>>
237 + Serialize
238 + DeserializeOwned
239{
240}
241
242/// A trait for consensus value payloads.
243///
244/// This trait defines the requirements for consensus value types used in the
245/// consensus engine. Your consensus value type must implement all the required
246/// traits to be compatible with the consensus engine.
247///
248/// ## Required Traits
249///
250/// - `Sync + Send`: Thread-safe and sendable across threads
251/// - `Ord + Display + Debug + Default + Clone`: Standard Rust traits
252/// - `Serialize + DeserializeOwned`: Serde serialization support
253///
254/// ## Example Implementation
255///
256/// ```rust
257/// #[derive(Clone, Debug, Default, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
258/// struct BlockData(String);
259///
260/// impl std::fmt::Display for BlockData {
261/// fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
262/// write!(f, "{}", self.0)
263/// }
264/// }
265/// ```
266pub trait ValuePayload:
267 Sync + Send + Ord + Display + Debug + Default + Clone + Serialize + DeserializeOwned
268{
269}
270impl<T> ValuePayload for T where
271 T: Sync + Send + Ord + Display + Debug + Default + Clone + Serialize + DeserializeOwned
272{
273}
274
275/// # Pathfinder consensus engine
276///
277/// This is the main consensus engine for Starknet nodes that implements
278/// Byzantine Fault Tolerant (BFT) consensus using the Malachite implementation
279/// of Tendermint. It's generic over validator addresses, consensus values, and
280/// proposer selection algorithms, making it suitable for Starknet's consensus
281/// requirements.
282///
283/// ## Generic Parameters
284///
285/// - `V`: Your consensus value type (must implement `ValuePayload`)
286/// - `A`: Your validator address type (must implement `ValidatorAddress`)
287/// - `P`: Your proposer selector type (must implement `ProposerSelector<A>`)
288///
289/// ## Usage
290///
291/// ```rust
292/// let config = Config::new(my_address);
293/// let mut consensus = Consensus::new(config);
294///
295/// // Start consensus at a height
296/// consensus.handle_command(ConsensusCommand::StartHeight(height, validator_set));
297///
298/// // Poll for events
299/// while let Some(event) = consensus.next_event().await {
300/// // Handle events
301/// }
302/// ```
303///
304/// ## Custom Proposer Selection
305///
306/// To use a custom proposer selector:
307///
308/// ```rust
309/// let config = Config::new(my_address);
310/// let custom_selector = WeightedProposerSelector;
311/// let mut consensus = Consensus::with_proposer_selector(config, custom_selector);
312/// ```
313///
314/// ## Crash Recovery
315///
316/// The consensus engine supports crash recovery through write-ahead logging:
317///
318/// ```rust
319/// let validator_sets = Arc::new(StaticValidatorSetProvider::new(validator_set));
320/// let mut consensus = Consensus::recover(config, validator_sets)?;
321/// ```
322pub struct Consensus<
323 V: ValuePayload + 'static,
324 A: ValidatorAddress + 'static,
325 P: ProposerSelector<A> + Send + Sync + 'static,
326> {
327 internal: HashMap<u64, InternalConsensus<V, A, P>>,
328 event_queue: VecDeque<ConsensusEvent<V, A>>,
329 config: Config<A>,
330 proposer_selector: P,
331 min_kept_height: Option<u64>,
332 last_decided_height: Option<u64>,
333}
334
335impl<
336 V: ValuePayload + 'static,
337 A: ValidatorAddress + 'static,
338 P: ProposerSelector<A> + Send + Sync + 'static,
339 > Consensus<V, A, P>
340{
341 /// Create a new consensus engine for the current validator.
342 ///
343 /// ## Arguments
344 ///
345 /// - `config`: The consensus configuration containing validator address,
346 /// timeouts, and other settings
347 ///
348 /// ## Example
349 ///
350 /// ```rust
351 /// let config = Config::new(my_address);
352 /// let mut consensus = Consensus::new(config);
353 /// ```
354 pub fn new(config: Config<A>) -> DefaultConsensus<V, A> {
355 Consensus {
356 internal: HashMap::new(),
357 event_queue: VecDeque::new(),
358 config,
359 proposer_selector: RoundRobinProposerSelector,
360 min_kept_height: None,
361 last_decided_height: None,
362 }
363 }
364
365 /// Create a new consensus engine with a custom proposer selector for this
366 /// consensus engine.
367 ///
368 /// ## Arguments
369 ///
370 /// - `config`: The consensus configuration containing validator address,
371 /// timeouts, and other settings
372 /// - `proposer_selector`: The proposer selection algorithm to use
373 ///
374 /// ## Example
375 ///
376 /// ```rust
377 /// let config = Config::new(my_address);
378 /// let custom_selector = WeightedProposerSelector;
379 /// let mut consensus = Consensus::with_proposer_selector(config, custom_selector);
380 /// ```
381 pub fn with_proposer_selector<PS: ProposerSelector<A> + Send + Sync + 'static>(
382 config: Config<A>,
383 proposer_selector: PS,
384 ) -> Consensus<V, A, PS> {
385 Consensus {
386 internal: HashMap::new(),
387 event_queue: VecDeque::new(),
388 config,
389 proposer_selector,
390 min_kept_height: None,
391 last_decided_height: None,
392 }
393 }
394
395 /// Recover recent heights from the write-ahead log.
396 ///
397 /// This method is used to recover consensus state after a crash or restart.
398 /// It reads the write-ahead log and reconstructs the consensus state for
399 /// all incomplete heights.
400 ///
401 /// ## Arguments
402 ///
403 /// - `config`: The consensus configuration
404 /// - `validator_sets`: A provider for validator sets at different heights
405 /// - `highest_committed`: The highest committed block in main storage
406 ///
407 /// ## Example
408 ///
409 /// ```rust
410 /// let validator_sets = Arc::new(StaticValidatorSetProvider::new(validator_set));
411 /// let mut consensus = Consensus::recover(config, validator_sets)?;
412 /// ```
413 pub fn recover<VS: ValidatorSetProvider<A> + 'static>(
414 config: Config<A>,
415 validator_sets: Arc<VS>,
416 highest_committed: Option<u64>,
417 ) -> anyhow::Result<DefaultConsensus<V, A>> {
418 Self::recover_inner(
419 Self::new(config.clone()),
420 config,
421 validator_sets,
422 highest_committed,
423 )
424 }
425
426 /// Recover recent heights from the write-ahead log with a custom proposer
427 /// selector for this consensus engine.
428 ///
429 /// This method is used to recover consensus state after a crash or restart.
430 /// It reads the write-ahead log and reconstructs the consensus state for
431 /// all incomplete heights.
432 ///
433 /// ## Arguments
434 ///
435 /// - `config`: The consensus configuration
436 /// - `validator_sets`: A provider for validator sets at different heights
437 /// - `proposer_selector`: The proposer selection algorithm to use
438 ///
439 /// ## Example
440 ///
441 /// ```rust
442 /// let validator_sets = Arc::new(StaticValidatorSetProvider::new(validator_set));
443 /// let mut consensus = Consensus::recover(config, validator_sets)?;
444 /// ```
445 pub fn recover_with_proposal_selector<
446 VS: ValidatorSetProvider<A> + 'static,
447 PS: ProposerSelector<A> + Send + Sync + 'static,
448 >(
449 config: Config<A>,
450 validator_sets: Arc<VS>,
451 proposer_selector: PS,
452 highest_committed: Option<u64>,
453 ) -> anyhow::Result<Consensus<V, A, PS>> {
454 Self::recover_inner(
455 Self::with_proposer_selector(config.clone(), proposer_selector),
456 config,
457 validator_sets,
458 highest_committed,
459 )
460 }
461
462 fn recover_inner<
463 VS: ValidatorSetProvider<A> + 'static,
464 PS: ProposerSelector<A> + Send + Sync + 'static,
465 >(
466 mut consensus: Consensus<V, A, PS>,
467 config: Config<A>,
468 validator_sets: Arc<VS>,
469 highest_committed: Option<u64>,
470 ) -> anyhow::Result<Consensus<V, A, PS>> {
471 use crate::wal::recovery;
472
473 tracing::info!(
474 validator = ?config.address,
475 wal_dir = %config.wal_dir.display(),
476 "Starting consensus recovery from WAL"
477 );
478
479 // Read the write-ahead log and recover all incomplete heights.
480 // This also returns finalized heights and the highest Decision height found
481 // (even in finalized heights).
482 let (incomplete_heights, finalized_heights, highest_decision) =
483 match recovery::recover_incomplete_heights(&config.wal_dir, highest_committed) {
484 Ok((incomplete, finalized, decision_height)) => {
485 tracing::info!(
486 validator = ?config.address,
487 incomplete_heights = incomplete.len(),
488 finalized_heights = finalized.len(),
489 highest_decision = ?decision_height,
490 "Found incomplete and finalized heights in WAL"
491 );
492 (incomplete, finalized, decision_height)
493 }
494 Err(e) => {
495 tracing::error!(
496 validator = ?config.address,
497 wal_dir = %config.wal_dir.display(),
498 error = %e,
499 "Failed to recover incomplete heights from WAL"
500 );
501 (Vec::new(), Vec::new(), None)
502 }
503 };
504
505 // Set last_decided_height from the highest Decision found during recovery.
506 consensus.last_decided_height = highest_decision;
507 if let Some(h) = highest_decision {
508 tracing::info!(
509 validator = ?consensus.config.address,
510 last_decided_height = %h,
511 "Set last_decided_height from WAL recovery"
512 );
513 }
514
515 // Determine the maximum height we're recovering (incomplete or finalized).
516 let max_height = incomplete_heights
517 .iter()
518 .chain(finalized_heights.iter())
519 .map(|(height, _)| *height)
520 .max()
521 .or(highest_decision);
522
523 // Calculate the minimum height to keep based on history_depth.
524 // This matches the pruning logic used during normal operation.
525 let min_height_to_restore =
526 max_height.and_then(|max| max.checked_sub(config.history_depth));
527
528 // Restore finalized heights that are within history_depth.
529 // This ensures we can accept votes for these heights, matching the behavior
530 // during normal operation where finalized heights remain in memory until
531 // pruned.
532 for (height, entries) in finalized_heights {
533 // Only restore finalized heights that are within history_depth.
534 // (if max_height < history_depth, restore all finalized heights)
535 let should_restore = min_height_to_restore
536 .map(|min| height >= min)
537 .unwrap_or(true);
538
539 if should_restore {
540 tracing::info!(
541 validator = ?consensus.config.address,
542 height = %height,
543 "Restoring finalized height within history_depth"
544 );
545
546 let validator_set = validator_sets.get_validator_set(height)?;
547 let mut internal_consensus = consensus.create_consensus(height, &validator_set);
548
549 // Recover from WAL first to restore the engine state.
550 internal_consensus.recover_from_wal(entries);
551
552 // Only call StartHeight if the height is not already finalized.
553 if !internal_consensus.is_finalized() {
554 internal_consensus
555 .handle_command(ConsensusCommand::StartHeight(height, validator_set));
556 }
557
558 consensus.internal.insert(height, internal_consensus);
559 } else {
560 tracing::debug!(
561 validator = ?consensus.config.address,
562 height = %height,
563 min_height = ?min_height_to_restore,
564 "Skipping finalized height outside history_depth"
565 );
566 }
567 }
568
569 // Manually recover all incomplete heights.
570 for (height, entries) in incomplete_heights {
571 tracing::info!(
572 validator = ?consensus.config.address,
573 height = %height,
574 entry_count = entries.len(),
575 "Recovering incomplete height from WAL"
576 );
577
578 let validator_set = validator_sets.get_validator_set(height)?;
579 let mut internal_consensus = consensus.create_consensus(height, &validator_set);
580 internal_consensus.handle_command(ConsensusCommand::StartHeight(height, validator_set));
581 internal_consensus.recover_from_wal(entries);
582 consensus.internal.insert(height, internal_consensus);
583 }
584
585 // Set min_kept_height to match what we've restored, so that is_height_finalized
586 // correctly identifies finalized heights that were pruned.
587 consensus.min_kept_height = min_height_to_restore;
588
589 tracing::info!(
590 validator = ?consensus.config.address,
591 recovered_heights = consensus.internal.len(),
592 last_decided_height = ?consensus.last_decided_height,
593 min_kept_height = ?consensus.min_kept_height,
594 "Completed consensus recovery"
595 );
596
597 Ok(consensus)
598 }
599
600 fn create_consensus(
601 &mut self,
602 height: u64,
603 validator_set: &ValidatorSet<A>,
604 ) -> InternalConsensus<V, A, P> {
605 let params = InternalParams {
606 height,
607 validator_set: validator_set.clone(),
608 address: self.config.address.clone(),
609 threshold_params: self.config.threshold_params,
610 value_payload: malachite_types::ValuePayload::ProposalOnly,
611 };
612
613 // Create a WAL for the height. If we fail, use a NoopWal.
614 let wal = match FileWalSink::new(&self.config.address, height, &self.config.wal_dir) {
615 Ok(wal) => Box::new(wal) as Box<dyn WalSink<V, A>>,
616 Err(e) => {
617 tracing::error!(
618 validator = ?self.config.address,
619 height = %height,
620 error = %e,
621 "Failed to create wal for height"
622 );
623 Box::new(NoopWal)
624 }
625 };
626
627 // A new consensus is created for every new height.
628 InternalConsensus::new(
629 params,
630 self.config.timeout_values.clone(),
631 wal,
632 self.proposer_selector.clone(),
633 )
634 }
635
636 /// Feed a command into the consensus engine.
637 ///
638 /// This method is the primary way to interact with the consensus engine.
639 /// Commands include starting new heights, submitting proposals, and
640 /// processing votes.
641 ///
642 /// ## Arguments
643 ///
644 /// - `cmd`: The command to process
645 ///
646 /// ## Example
647 ///
648 /// ```rust
649 /// // Start a new height
650 /// consensus.handle_command(ConsensusCommand::StartHeight(height, validator_set));
651 ///
652 /// // Submit a proposal
653 /// consensus.handle_command(ConsensusCommand::Proposal(signed_proposal));
654 ///
655 /// // Process a vote
656 /// consensus.handle_command(ConsensusCommand::Vote(signed_vote));
657 /// ```
658 pub fn handle_command(&mut self, cmd: ConsensusCommand<V, A>) {
659 match cmd {
660 // Start a new height.
661 ConsensusCommand::StartHeight(height, validator_set) => {
662 // A new consensus is created for every new height.
663 let mut consensus = self.create_consensus(height, &validator_set);
664 consensus.handle_command(ConsensusCommand::StartHeight(height, validator_set));
665 self.internal.insert(height, consensus);
666 tracing::debug!(
667 validator = ?self.config.address,
668 height = %height,
669 "Started new consensus"
670 );
671 }
672 other => {
673 let height = other.height();
674 if let Some(engine) = self.internal.get_mut(&height) {
675 engine.handle_command(other);
676 } else {
677 tracing::warn!(
678 validator = ?self.config.address,
679 height = %height,
680 command = ?other,
681 "Received command for unknown height"
682 );
683 }
684 }
685 }
686 }
687
688 /// Poll all engines for an event.
689 ///
690 /// This method should be called regularly to process events from the
691 /// consensus engine. Events include requests for proposals, decisions,
692 /// gossip messages, and errors.
693 ///
694 /// ## Returns
695 ///
696 /// Returns `Some(event)` if an event is available, or `None` if no events
697 /// are ready.
698 ///
699 /// ## Example
700 ///
701 /// ```rust
702 /// while let Some(event) = consensus.next_event().await {
703 /// match event {
704 /// ConsensusEvent::RequestProposal { height, round } => {
705 /// // Build and submit a proposal
706 /// }
707 /// ConsensusEvent::Decision { height, value } => {
708 /// // Consensus reached, process the value
709 /// }
710 /// ConsensusEvent::Gossip(message) => {
711 /// // Send message to peers
712 /// }
713 /// ConsensusEvent::Error(error) => {
714 /// // Handle error
715 /// }
716 /// }
717 /// }
718 /// ```
719 pub async fn next_event(&mut self) -> Option<ConsensusEvent<V, A>> {
720 let mut finished_heights = Vec::new();
721 // Drain each internal engine.
722 for (height, engine) in self.internal.iter_mut() {
723 if let Some(event) = engine.poll_internal().await {
724 tracing::trace!(
725 validator = ?self.config.address,
726 height = %height,
727 event = ?event,
728 "Engine returned event"
729 );
730 // Track finished heights and update last_decided_height.
731 if let ConsensusEvent::Decision { height, .. } = &event {
732 finished_heights.push(*height);
733 // Update last_decided_height to track the highest decided height.
734 self.last_decided_height = Some(
735 self.last_decided_height
736 .map(|h| h.max(*height))
737 .unwrap_or(*height),
738 );
739 }
740 // Push the event to the queue.
741 self.event_queue.push_back(event);
742 }
743 }
744
745 // Prune old engines if we have any finished heights.
746 if !finished_heights.is_empty() {
747 self.prune_old_engines();
748 }
749
750 // Return the first event from the queue.
751 self.event_queue.pop_front()
752 }
753
754 /// Prune old engines from the internal map.
755 fn prune_old_engines(&mut self) {
756 let max_height = self.internal.keys().max().copied();
757 if let Some(max_height) = max_height {
758 let new_min_height = max_height.checked_sub(self.config.history_depth);
759
760 if let Some(new_min) = new_min_height {
761 // Collect heights that will be pruned (before we remove them from the map).
762 let pruned_heights: Vec<u64> = self
763 .internal
764 .keys()
765 .filter(|height| **height < new_min)
766 .copied()
767 .collect();
768
769 // Prune the internal map and set the new min_kept_height.
770 self.min_kept_height = Some(new_min);
771 self.internal.retain(|height, _| *height >= new_min);
772
773 // Delete WAL files for pruned heights.
774 for height in &pruned_heights {
775 if let Err(e) =
776 delete_wal_file(&self.config.address, *height, &self.config.wal_dir)
777 {
778 tracing::warn!(
779 validator = ?self.config.address,
780 height = %height,
781 error = %e,
782 "Failed to delete WAL file for pruned height"
783 );
784 }
785 }
786
787 tracing::debug!(
788 validator = ?self.config.address,
789 min_height = %new_min,
790 max_height = %max_height,
791 pruned_count = pruned_heights.len(),
792 "Pruned old consensus engines and deleted WAL files"
793 );
794 }
795 }
796 }
797
798 /// Check if a specific height has been finalized (i.e., a decision has been
799 /// reached)
800 ///
801 /// ## Arguments
802 ///
803 /// - `height`: The height to check
804 ///
805 /// ## Returns
806 ///
807 /// Returns `true` if the height has been finalized, `false` otherwise.
808 ///
809 /// ## Example
810 ///
811 /// ```rust
812 /// if consensus.is_height_finalized(height) {
813 /// println!("Height {} has been finalized", height);
814 /// }
815 /// ```
816 pub fn is_height_finalized(&self, height: u64) -> bool {
817 if let Some(engine) = self.internal.get(&height) {
818 engine.is_finalized()
819 } else {
820 // If the height is not in our internal map, it might have been pruned
821 // after being finalized, so we assume it's finalized
822 if let Some(min_height) = self.min_kept_height {
823 if height < min_height {
824 return true;
825 }
826 }
827 false
828 }
829 }
830
831 /// Check if a specific height is actively tracked by the consensus engine.
832 ///
833 /// ## Arguments
834 ///
835 /// - `height`: The height to check
836 ///
837 /// ## Returns
838 ///
839 /// Returns `true` if the height is active, `false` otherwise.
840 pub fn is_height_active(&self, height: u64) -> bool {
841 self.internal.contains_key(&height)
842 }
843
844 /// Get the maximum height actively being tracked by the consensus engine.
845 ///
846 /// This returns the highest height that consensus is currently working on,
847 /// which includes incomplete heights that haven't reached a decision yet.
848 /// Returns `None` if there are no actively tracked heights.
849 pub fn max_active_height(&self) -> Option<u64> {
850 self.internal.keys().max().copied()
851 }
852
853 /// Get the highest height that consensus has decided on.
854 ///
855 /// This returns the highest height that has a Decision entry, even if that
856 /// height is no longer actively tracked (e.g., after recovery when it was
857 /// skipped).
858 ///
859 /// Returns `None` if no decisions have been made yet.
860 pub fn last_decided_height(&self) -> Option<u64> {
861 self.last_decided_height
862 }
863}
864
865/// A round number (or `None` if the round is nil).
866///
867/// This type represents a consensus round number. A round can be either a
868/// specific round number or nil (None), which represents a special state in the
869/// consensus protocol.
870///
871/// ## Example
872///
873/// ```rust
874/// let round = Round::new(5); // Round 5
875/// let nil_round = Round::nil(); // Nil round
876/// ```
877#[derive(Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
878pub struct Round(pub Option<u32>);
879
880impl Round {
881 /// Create a new round with the given round number.
882 pub fn new(round: u32) -> Self {
883 Self(Some(round))
884 }
885
886 /// Create a nil round.
887 pub fn nil() -> Self {
888 Self(None)
889 }
890
891 /// Get the round number as a u32, if it's not nil.
892 pub fn as_u32(&self) -> Option<u32> {
893 self.0
894 }
895}
896
897impl From<u32> for Round {
898 fn from(round: u32) -> Self {
899 Self::new(round)
900 }
901}
902
903impl Add<u32> for Round {
904 type Output = Self;
905
906 fn add(self, rhs: u32) -> Self::Output {
907 Self(self.0.map(|round| round + rhs))
908 }
909}
910
911impl Sub<u32> for Round {
912 type Output = Self;
913
914 fn sub(self, rhs: u32) -> Self::Output {
915 Self(self.0.map(|round| round - rhs))
916 }
917}
918
919impl Display for Round {
920 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
921 match self.0 {
922 Some(round) => write!(f, "{round}"),
923 None => write!(f, "Nil"),
924 }
925 }
926}
927
928impl Debug for Round {
929 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
930 write!(f, "{self}")
931 }
932}
933
934/// A proposal for a block value in a consensus round.
935///
936/// A proposal is created by the designated proposer for a given height and
937/// round. It contains the proposed block value along with additional metadata.
938#[derive(Clone, PartialEq, Eq, Serialize, Deserialize)]
939pub struct Proposal<V, A> {
940 /// The blockchain height
941 pub height: u64,
942 /// The consensus round number
943 pub round: Round,
944 /// The proposed consensus value
945 pub value: V,
946 /// The POL round for which the proposal is for
947 pub pol_round: Round,
948 /// The address of the proposer
949 pub proposer: A,
950}
951
952impl<V: Debug, A: Debug> std::fmt::Debug for Proposal<V, A> {
953 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
954 write!(
955 f,
956 "H:{} R:{} From:{:?} Val:{:?}",
957 self.height, self.round, self.proposer, self.value
958 )
959 }
960}
961
962/// The type of vote.
963#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
964pub enum VoteType {
965 /// A preliminary vote
966 Prevote,
967 /// A final vote that commits to a value
968 Precommit,
969}
970
971/// A vote for a value in a consensus round.
972///
973/// A vote is cast by a validator to indicate their agreement or disagreement
974/// with a proposed block value. The vote includes the validator's address, the
975/// round number, and the block value being voted on.
976#[derive(Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
977pub struct Vote<V, A> {
978 /// The type of vote (Prevote or Precommit)
979 pub r#type: VoteType,
980 /// The blockchain height
981 pub height: u64,
982 /// The consensus round number
983 pub round: Round,
984 /// The value being voted on (None for nil votes)
985 pub value: Option<V>,
986 /// The address of the validator casting the vote
987 pub validator_address: A,
988}
989
990impl<V: Debug, A: Debug> std::fmt::Debug for Vote<V, A> {
991 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
992 let val = match &self.value {
993 Some(val) => format!("{val:?}"),
994 None => "Nil".to_string(),
995 };
996 write!(
997 f,
998 "H:{} R:{} {:?} From:{:?} Val:{val}",
999 self.height, self.round, self.r#type, self.validator_address
1000 )
1001 }
1002}
1003
1004impl<V, A> Vote<V, A> {
1005 /// Check if the vote is nil.
1006 ///
1007 /// A nil vote is a vote that does not commit to a value.
1008 pub fn is_nil(&self) -> bool {
1009 self.value.is_none()
1010 }
1011}
1012
1013/// A fully validated, signed proposal ready to enter consensus.
1014///
1015/// This type wraps a proposal with a cryptographic signature to ensure
1016/// authenticity and integrity.
1017#[derive(Clone, Serialize, Deserialize)]
1018pub struct SignedProposal<V, A> {
1019 pub proposal: Proposal<V, A>,
1020 pub signature: Signature,
1021}
1022
1023/// A signed vote.
1024///
1025/// This type wraps a vote with a cryptographic signature to ensure
1026/// authenticity and integrity.
1027#[derive(Clone, Serialize, Deserialize)]
1028pub struct SignedVote<V, A> {
1029 pub vote: Vote<V, A>,
1030 pub signature: Signature,
1031}
1032
1033// Note: We intentionally ignore the signature as it's not used yet.
1034impl<V: Debug, A: Debug> std::fmt::Debug for SignedProposal<V, A> {
1035 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1036 write!(f, "{:?}", self.proposal)
1037 }
1038}
1039
1040// Note: We intentionally ignore the signature as it's not used yet.
1041impl<V: Debug, A: Debug> std::fmt::Debug for SignedVote<V, A> {
1042 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1043 write!(f, "{:?}", self.vote)
1044 }
1045}
1046
1047/// A public key for the consensus protocol.
1048///
1049/// This type is used to verify signatures on proposals and votes in the
1050/// consensus protocol. Each validator has an associated public key that is used
1051/// to authenticate their messages.
1052pub type PublicKey = malachite_signing_ed25519::PublicKey;
1053
1054/// A validator's voting power.
1055pub type VotingPower = u64;
1056
1057/// A validator in the consensus protocol.
1058///
1059/// Each validator has an associated address and public key to uniquely identify
1060/// them. The voting power determines their weight in consensus decisions.
1061#[derive(Clone, PartialEq, Eq, PartialOrd, Ord)]
1062pub struct Validator<A> {
1063 /// The validator's address
1064 pub address: A,
1065 /// The validator's public key for signature verification
1066 pub public_key: PublicKey,
1067 /// The validator's voting power (weight in consensus)
1068 pub voting_power: VotingPower,
1069}
1070
1071impl<A: Debug> std::fmt::Debug for Validator<A> {
1072 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1073 write!(f, "{:?} ({})", self.address, self.voting_power)
1074 }
1075}
1076
1077impl<A> Validator<A> {
1078 /// Create a new validator with the given address and public key.
1079 ///
1080 /// The voting power defaults to 1.
1081 pub fn new(address: A, public_key: PublicKey) -> Self {
1082 Self {
1083 address,
1084 public_key,
1085 voting_power: 1,
1086 }
1087 }
1088
1089 /// Set the voting power for the validator.
1090 ///
1091 /// This method returns `self` for method chaining.
1092 pub fn with_voting_power(mut self, voting_power: VotingPower) -> Self {
1093 self.voting_power = voting_power;
1094 self
1095 }
1096}
1097
1098/// A validator set represents a group of consensus participants.
1099///
1100/// The validator set defines who can participate in consensus at a given
1101/// height. Each validator in the set has a voting power that determines their
1102/// weight in consensus decisions.
1103#[derive(Clone, Debug, PartialEq, Eq)]
1104pub struct ValidatorSet<A> {
1105 /// The list of validators in the set
1106 pub validators: Vec<Validator<A>>,
1107}
1108
1109impl<A: Clone + Ord> ValidatorSet<A> {
1110 /// Create a new validator set with the given validators.
1111 pub fn new(validators: impl IntoIterator<Item = Validator<A>>) -> Self {
1112 // Ensure validators are unique by address.
1113 let validators: BTreeMap<A, Validator<A>> = validators
1114 .into_iter()
1115 .map(|v| (v.address.clone(), v))
1116 .collect();
1117 assert!(!validators.is_empty());
1118 let validators = validators.into_values().collect::<Vec<Validator<A>>>();
1119 Self { validators }
1120 }
1121
1122 /// Get the number of validators in the set.
1123 pub fn count(&self) -> usize {
1124 self.validators.len()
1125 }
1126}
1127
1128/// Commands that the application can send into the consensus engine.
1129///
1130/// These commands represent the primary interface for interacting with the
1131/// consensus engine. They allow the application to start new heights,
1132/// submit proposals, and process votes.
1133pub enum ConsensusCommand<V, A> {
1134 /// Start consensus at a given height with the validator set.
1135 StartHeight(u64, ValidatorSet<A>),
1136 /// A complete, locally validated and signed proposal that we create as the
1137 /// proposer for the current round.
1138 Propose(Proposal<V, A>),
1139 /// A complete, locally validated and signed proposal that was received over
1140 /// the network from another validator.
1141 Proposal(SignedProposal<V, A>),
1142 /// A signed vote received from the network.
1143 Vote(SignedVote<V, A>),
1144}
1145
1146impl<V, A> ConsensusCommand<V, A> {
1147 /// Get the consensus height associated with the command.
1148 pub fn height(&self) -> u64 {
1149 match self {
1150 ConsensusCommand::StartHeight(height, _) => *height,
1151 ConsensusCommand::Propose(proposal) => proposal.height,
1152 ConsensusCommand::Proposal(proposal) => proposal.proposal.height,
1153 ConsensusCommand::Vote(vote) => vote.vote.height,
1154 }
1155 }
1156}
1157
1158impl<V: Debug, A: Debug> std::fmt::Debug for ConsensusCommand<V, A> {
1159 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1160 match self {
1161 ConsensusCommand::StartHeight(height, validator_set) => write!(
1162 f,
1163 "StartHeight({}, [{}])",
1164 height,
1165 validator_set
1166 .validators
1167 .iter()
1168 .map(|v| format!("{:?}", v.address))
1169 .collect::<Vec<_>>()
1170 .join(", ")
1171 ),
1172 ConsensusCommand::Propose(proposal) => write!(f, "Propose({proposal:?})"),
1173 ConsensusCommand::Proposal(proposal) => write!(f, "Proposal({proposal:?})"),
1174 ConsensusCommand::Vote(vote) => write!(f, "Vote({vote:?})"),
1175 }
1176 }
1177}
1178
1179/// A message to be gossiped to peers.
1180///
1181/// These messages represent network communication that needs to be sent to
1182/// other validators in the network.
1183#[derive(Clone, Debug)]
1184pub enum NetworkMessage<V, A> {
1185 /// A complete, locally validated and signed proposal ready to be gossiped.
1186 Proposal(SignedProposal<V, A>),
1187 /// A vote received from the network.
1188 Vote(SignedVote<V, A>),
1189}
1190
1191/// Events that the consensus engine emits for the application to handle.
1192///
1193/// These events represent the output of the consensus engine and tell the
1194/// application what actions it needs to take.
1195pub enum ConsensusEvent<V, A> {
1196 /// The consensus wants this message to be gossiped to peers.
1197 ///
1198 /// The application should send this message to all peers in the network.
1199 Gossip(NetworkMessage<V, A>),
1200 /// The consensus needs the app to build and inject a proposal.
1201 ///
1202 /// The application should create a proposal for the given height and round,
1203 /// then submit it to the consensus engine.
1204 RequestProposal { height: u64, round: u32 },
1205 /// The consensus has reached a decision and committed a block.
1206 ///
1207 /// This event indicates that consensus has been reached for the given
1208 /// height and the value should be committed to the blockchain.
1209 Decision { height: u64, round: u32, value: V },
1210 /// An internal error occurred in consensus.
1211 ///
1212 /// The application should handle this error appropriately, possibly by
1213 /// logging it or taking corrective action.
1214 Error(ConsensusError),
1215}
1216
1217impl<V: Debug, A: Debug> std::fmt::Debug for ConsensusEvent<V, A> {
1218 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1219 match self {
1220 ConsensusEvent::Gossip(msg) => match msg {
1221 NetworkMessage::Proposal(proposal) => write!(f, "Gossip(Proposal({proposal:?}))"),
1222 NetworkMessage::Vote(vote) => write!(f, "Gossip(Vote({vote:?}))"),
1223 },
1224 ConsensusEvent::RequestProposal { height, round, .. } => {
1225 write!(f, "RequestProposal(H:{height} R:{round})")
1226 }
1227 ConsensusEvent::Decision {
1228 height,
1229 round,
1230 value,
1231 } => {
1232 write!(f, "Decision(H:{height} R: {round} Val:{value:?})")
1233 }
1234 ConsensusEvent::Error(error) => write!(f, "Error({error:?})"),
1235 }
1236 }
1237}
1238
1239/// A trait for retrieving the validator set at a specific blockchain height.
1240///
1241/// This trait allows consensus to dynamically determine the set of validators
1242/// that are eligible to participate in consensus at any given height.
1243///
1244/// This is useful for handling validator set changes across heights.
1245pub trait ValidatorSetProvider<A> {
1246 fn get_validator_set(&self, height: u64) -> Result<ValidatorSet<A>, anyhow::Error>;
1247}
1248
1249/// A trait for selecting the proposer for a given height and round.
1250///
1251/// This trait allows consumers to provide custom proposer selection logic
1252/// instead of using the default round-robin approach. This is useful for
1253/// implementing more sophisticated proposer selection algorithms like
1254/// VRF-based selection, weighted selection, or other custom logic.
1255///
1256/// ## Example Implementation
1257///
1258/// ```rust
1259/// use pathfinder_consensus::*;
1260///
1261/// struct RoundRobinProposerSelector;
1262///
1263/// impl<A: ValidatorAddress> ProposerSelector<A> for RoundRobinProposerSelector {
1264/// fn select_proposer<'a>(
1265/// &self,
1266/// validator_set: &'a ValidatorSet<A>,
1267/// height: u64,
1268/// round: u32,
1269/// ) -> &'a Validator<A> {
1270/// let index = round as usize % validator_set.count();
1271/// &validator_set.validators[index]
1272/// }
1273/// }
1274/// ```
1275pub trait ProposerSelector<A: ValidatorAddress>: Clone + Send + Sync {
1276 /// Select the proposer for the given height and round.
1277 ///
1278 /// ## Arguments
1279 ///
1280 /// - `validator_set`: The set of validators eligible to propose
1281 /// - `height`: The blockchain height
1282 /// - `round`: The consensus round number
1283 ///
1284 /// ## Returns
1285 ///
1286 /// Returns a reference to the selected validator who should propose
1287 /// for the given height and round.
1288 fn select_proposer<'a>(
1289 &self,
1290 validator_set: &'a ValidatorSet<A>,
1291 height: u64,
1292 round: u32,
1293 ) -> &'a Validator<A>;
1294}
1295
1296/// A default proposer selector that uses round-robin selection.
1297///
1298/// This is the default proposer selection algorithm that selects proposers
1299/// in a round-robin fashion based on the round number modulo the number of
1300/// validators.
1301#[derive(Clone, Default)]
1302pub struct RoundRobinProposerSelector;
1303
1304impl<A: ValidatorAddress> ProposerSelector<A> for RoundRobinProposerSelector {
1305 fn select_proposer<'a>(
1306 &self,
1307 validator_set: &'a ValidatorSet<A>,
1308 _height: u64,
1309 round: u32,
1310 ) -> &'a Validator<A> {
1311 let index = round as usize % validator_set.count();
1312 &validator_set.validators[index]
1313 }
1314}
1315
1316/// A type alias for consensus with the default round-robin proposer selector.
1317///
1318/// This provides a convenient way to create consensus instances without
1319/// specifying the proposer selector type.
1320pub type DefaultConsensus<V, A> = Consensus<V, A, RoundRobinProposerSelector>;
1321
1322/// A validator set provider that always returns the same validator set.
1323///
1324/// This is a simple implementation of `ValidatorSetProvider` that returns
1325/// the same validator set for all heights. This is useful for testing or
1326/// for applications where the validator set doesn't change.
1327pub struct StaticValidatorSetProvider<A> {
1328 validator_set: ValidatorSet<A>,
1329}
1330
1331impl<A> StaticValidatorSetProvider<A> {
1332 /// Create a new static validator set provider.
1333 ///
1334 /// ## Arguments
1335 ///
1336 /// - `validator_set`: The validator set to return for all heights
1337 pub fn new(validator_set: ValidatorSet<A>) -> Self {
1338 Self { validator_set }
1339 }
1340}
1341
1342impl<A: Clone + Send + Sync> ValidatorSetProvider<A> for StaticValidatorSetProvider<A> {
1343 fn get_validator_set(&self, _height: u64) -> Result<ValidatorSet<A>, anyhow::Error> {
1344 Ok(self.validator_set.clone())
1345 }
1346}
1347
1348#[cfg(test)]
1349mod tests {
1350 use super::*;
1351
1352 #[test]
1353 fn regression_validator_set_is_unique_by_address() {
1354 let with_duplicates = [1, 1, 2, 2, 2, 3, 3, 3, 3, 2, 1, 1, 2, 3, 2, 2, 1, 1, 3, 3]
1355 .into_iter()
1356 .map(|i| Validator::new(i, crate::PublicKey::from_bytes([0; 32])));
1357 let set = ValidatorSet::new(with_duplicates);
1358
1359 assert_eq!(
1360 set.validators.iter().map(|v| v.address).collect::<Vec<_>>(),
1361 vec![1, 2, 3]
1362 );
1363 }
1364}