guts_consensus/simplex/
engine.rs

1//! Simplex BFT consensus engine.
2//!
3//! This module provides the main engine that orchestrates the Simplex BFT
4//! consensus protocol for the Guts decentralized code collaboration platform.
5//!
6//! # Architecture
7//!
8//! The engine integrates with commonware-consensus's Simplex implementation
9//! to provide Byzantine fault-tolerant consensus. The key components are:
10//!
11//! - **Application Actor**: Handles block proposal and verification
12//! - **Marshal Actor**: Manages block storage and synchronization
13//! - **Buffer Engine**: Buffers broadcast messages for efficient delivery
14//! - **Consensus Engine**: Core BFT voting logic (Batcher, Voter, Resolver)
15//!
16//! # Usage
17//!
18//! The engine requires P2P channels for:
19//! - `pending`: Pending consensus votes
20//! - `recovered`: Recovered messages after reconnection
21//! - `resolver`: Fetching missing certificates
22//! - `broadcast`: Block broadcast messages
23//! - `marshal`: Block sync messages
24
25use super::{
26    application::{self, Actor as ApplicationActor, Mailbox as ApplicationMailbox},
27    block::SimplexBlock,
28    types::Scheme,
29};
30use commonware_broadcast::buffered;
31use commonware_consensus::{
32    marshal::{self, ingress::handler},
33    simplex::{self, Engine as Consensus},
34};
35use commonware_cryptography::{ed25519::PublicKey, sha256::Digest};
36use commonware_p2p::{Blocker, Receiver, Sender};
37use commonware_resolver::Resolver;
38use commonware_runtime::{buffer::PoolRef, Clock, Handle, Metrics, Spawner, Storage};
39use commonware_utils::{set::Ordered, NZUsize, NZU64};
40use futures::{channel::mpsc, future::try_join_all};
41use governor::clock::Clock as GClock;
42use governor::Quota;
43use rand::{CryptoRng, Rng};
44use std::{marker::PhantomData, num::NonZero, sync::Arc, time::Duration};
45use tracing::{error, info, warn};
46
47/// Type alias for the finalization callback.
48pub type FinalizedCallback = Arc<dyn Fn(&SimplexBlock) + Send + Sync>;
49
50/// Namespace for Guts consensus messages.
51pub const NAMESPACE: &[u8] = b"guts-consensus";
52
53/// Epoch for the consensus instance.
54pub const EPOCH: u64 = 0;
55
56/// Epoch length (effectively infinite for single-epoch operation).
57pub const EPOCH_LENGTH: u64 = u64::MAX;
58
59// Buffer and storage constants
60const PRUNABLE_ITEMS_PER_SECTION: NonZero<u64> = NZU64!(4_096);
61const IMMUTABLE_ITEMS_PER_SECTION: NonZero<u64> = NZU64!(262_144);
62const FREEZER_TABLE_RESIZE_FREQUENCY: u8 = 4;
63const FREEZER_TABLE_RESIZE_CHUNK_SIZE: u32 = 2u32.pow(16);
64const FREEZER_JOURNAL_TARGET_SIZE: u64 = 1024 * 1024 * 1024; // 1GB
65const FREEZER_JOURNAL_COMPRESSION: Option<u8> = Some(3);
66const REPLAY_BUFFER: NonZero<usize> = NZUsize!(8 * 1024 * 1024); // 8MB
67const WRITE_BUFFER: NonZero<usize> = NZUsize!(1024 * 1024); // 1MB
68const BUFFER_POOL_PAGE_SIZE: NonZero<usize> = NZUsize!(4_096); // 4KB
69const BUFFER_POOL_CAPACITY: NonZero<usize> = NZUsize!(8_192); // 32MB
70const MAX_REPAIR: u64 = 20;
71const SYNCER_ACTIVITY_TIMEOUT_MULTIPLIER: u64 = 10;
72
73/// Reporter type for the consensus engine.
74///
75/// Uses the marshal mailbox as the activity reporter, with no additional indexer.
76type SimpleReporter = marshal::Mailbox<Scheme, SimplexBlock>;
77
78/// Static scheme provider that always returns the same signing scheme.
79#[derive(Clone)]
80pub struct StaticSchemeProvider(Arc<Scheme>);
81
82impl marshal::SchemeProvider for StaticSchemeProvider {
83    type Scheme = Scheme;
84
85    fn scheme(&self, _epoch: u64) -> Option<Arc<Scheme>> {
86        Some(self.0.clone())
87    }
88}
89
90impl From<Scheme> for StaticSchemeProvider {
91    fn from(scheme: Scheme) -> Self {
92        Self(Arc::new(scheme))
93    }
94}
95
96/// Configuration for the Simplex engine.
97#[derive(Clone)]
98pub struct Config<B: Blocker<PublicKey = PublicKey>> {
99    /// The blocker for managing peer connections.
100    pub blocker: B,
101
102    /// Prefix for storage partitions.
103    pub partition_prefix: String,
104
105    /// Initial size for the blocks freezer table.
106    pub blocks_freezer_table_initial_size: u32,
107
108    /// Initial size for the finalized freezer table.
109    pub finalized_freezer_table_initial_size: u32,
110
111    /// Our public key.
112    pub me: PublicKey,
113
114    /// Our private key for signing.
115    pub private_key: commonware_cryptography::ed25519::PrivateKey,
116
117    /// The set of participants in consensus.
118    pub participants: Ordered<PublicKey>,
119
120    /// Size of mailbox channels.
121    pub mailbox_size: usize,
122
123    /// Size of message deques.
124    pub deque_size: usize,
125
126    /// Timeout for leader proposal.
127    pub leader_timeout: Duration,
128
129    /// Timeout for notarization.
130    pub notarization_timeout: Duration,
131
132    /// Retry interval for nullify messages.
133    pub nullify_retry: Duration,
134
135    /// Timeout for fetch requests.
136    pub fetch_timeout: Duration,
137
138    /// Activity timeout in views.
139    pub activity_timeout: u64,
140
141    /// Skip timeout in views.
142    pub skip_timeout: u64,
143
144    /// Maximum number of blocks to fetch at once.
145    pub max_fetch_count: usize,
146
147    /// Number of concurrent fetch requests.
148    pub fetch_concurrent: usize,
149
150    /// Rate limit for fetch requests per peer.
151    pub fetch_rate_per_peer: Quota,
152
153    /// Callback for finalized blocks.
154    pub on_finalized: Option<FinalizedCallback>,
155}
156
157impl<B: Blocker<PublicKey = PublicKey>> Config<B> {
158    /// Creates a new configuration with sensible defaults.
159    pub fn new(
160        blocker: B,
161        me: PublicKey,
162        private_key: commonware_cryptography::ed25519::PrivateKey,
163        participants: Vec<PublicKey>,
164    ) -> Self {
165        Self {
166            blocker,
167            partition_prefix: "guts".to_string(),
168            blocks_freezer_table_initial_size: 2u32.pow(21), // ~100MB
169            finalized_freezer_table_initial_size: 2u32.pow(21),
170            me,
171            private_key,
172            participants: participants.into_iter().collect(),
173            mailbox_size: 1024,
174            deque_size: 10,
175            leader_timeout: Duration::from_secs(1),
176            notarization_timeout: Duration::from_secs(2),
177            nullify_retry: Duration::from_secs(10),
178            fetch_timeout: Duration::from_secs(2),
179            activity_timeout: 256,
180            skip_timeout: 32,
181            max_fetch_count: 16,
182            fetch_concurrent: 4,
183            fetch_rate_per_peer: Quota::per_second(std::num::NonZeroU32::new(128).unwrap()),
184            on_finalized: None,
185        }
186    }
187
188    /// Sets the callback for finalized blocks.
189    pub fn on_finalized<F>(mut self, callback: F) -> Self
190    where
191        F: Fn(&SimplexBlock) + Send + Sync + 'static,
192    {
193        self.on_finalized = Some(Arc::new(callback));
194        self
195    }
196}
197
198/// The Simplex BFT consensus engine.
199pub struct Engine<E, B>
200where
201    E: Clock + GClock + Rng + CryptoRng + Spawner + Storage + Metrics + Clone,
202    B: Blocker<PublicKey = PublicKey>,
203{
204    context: E,
205
206    application: ApplicationActor<E>,
207    application_mailbox: ApplicationMailbox,
208    buffer: buffered::Engine<E, PublicKey, SimplexBlock>,
209    buffer_mailbox: buffered::Mailbox<PublicKey, SimplexBlock>,
210    marshal: marshal::Actor<E, SimplexBlock, StaticSchemeProvider, Scheme>,
211    marshal_mailbox: marshal::Mailbox<Scheme, SimplexBlock>,
212
213    consensus: Consensus<
214        E,
215        PublicKey,
216        Scheme,
217        B,
218        Digest,
219        ApplicationMailbox,
220        ApplicationMailbox,
221        SimpleReporter,
222    >,
223}
224
225impl<E, B> Engine<E, B>
226where
227    E: Clock + GClock + Rng + CryptoRng + Spawner + Storage + Metrics + Clone,
228    B: Blocker<PublicKey = PublicKey>,
229{
230    /// Creates a new Simplex engine.
231    pub async fn new(context: E, cfg: Config<B>) -> Self {
232        // Create the application actor
233        let (application, application_mailbox) = ApplicationActor::new(
234            context.clone(),
235            application::Config {
236                mailbox_size: cfg.mailbox_size,
237            },
238        );
239
240        // Create the buffer
241        let (buffer, buffer_mailbox) = buffered::Engine::new(
242            context.clone(),
243            buffered::Config {
244                public_key: cfg.me.clone(),
245                mailbox_size: cfg.mailbox_size,
246                deque_size: cfg.deque_size,
247                priority: true,
248                codec_config: (),
249            },
250        );
251
252        // Create the buffer pool
253        let buffer_pool = PoolRef::new(BUFFER_POOL_PAGE_SIZE, BUFFER_POOL_CAPACITY);
254
255        // Create the signing scheme
256        let scheme = Scheme::new(cfg.participants.clone(), cfg.private_key);
257
258        // Create marshal
259        let (marshal, marshal_mailbox) = marshal::Actor::init(
260            context.clone(),
261            marshal::Config {
262                scheme_provider: scheme.clone().into(),
263                epoch_length: EPOCH_LENGTH,
264                partition_prefix: cfg.partition_prefix.clone(),
265                mailbox_size: cfg.mailbox_size,
266                view_retention_timeout: cfg
267                    .activity_timeout
268                    .saturating_mul(SYNCER_ACTIVITY_TIMEOUT_MULTIPLIER),
269                namespace: NAMESPACE.to_vec(),
270                prunable_items_per_section: PRUNABLE_ITEMS_PER_SECTION,
271                immutable_items_per_section: IMMUTABLE_ITEMS_PER_SECTION,
272                freezer_table_initial_size: cfg.blocks_freezer_table_initial_size,
273                freezer_table_resize_frequency: FREEZER_TABLE_RESIZE_FREQUENCY,
274                freezer_table_resize_chunk_size: FREEZER_TABLE_RESIZE_CHUNK_SIZE,
275                freezer_journal_target_size: FREEZER_JOURNAL_TARGET_SIZE,
276                freezer_journal_compression: FREEZER_JOURNAL_COMPRESSION,
277                freezer_journal_buffer_pool: buffer_pool.clone(),
278                replay_buffer: REPLAY_BUFFER,
279                write_buffer: WRITE_BUFFER,
280                block_codec_config: (),
281                max_repair: MAX_REPAIR,
282                _marker: PhantomData,
283            },
284        )
285        .await;
286
287        // Use the marshal mailbox directly as the reporter
288        let reporter = marshal_mailbox.clone();
289
290        // Create the consensus engine
291        let consensus = Consensus::new(
292            context.clone(),
293            simplex::Config {
294                epoch: EPOCH,
295                namespace: NAMESPACE.to_vec(),
296                scheme,
297                automaton: application_mailbox.clone(),
298                relay: application_mailbox.clone(),
299                reporter,
300                partition: format!("{}-consensus", cfg.partition_prefix),
301                mailbox_size: cfg.mailbox_size,
302                leader_timeout: cfg.leader_timeout,
303                notarization_timeout: cfg.notarization_timeout,
304                nullify_retry: cfg.nullify_retry,
305                fetch_timeout: cfg.fetch_timeout,
306                activity_timeout: cfg.activity_timeout,
307                skip_timeout: cfg.skip_timeout,
308                max_fetch_count: cfg.max_fetch_count,
309                fetch_concurrent: cfg.fetch_concurrent,
310                fetch_rate_per_peer: cfg.fetch_rate_per_peer,
311                replay_buffer: REPLAY_BUFFER,
312                write_buffer: WRITE_BUFFER,
313                blocker: cfg.blocker,
314                buffer_pool,
315            },
316        );
317
318        info!(
319            participants = cfg.participants.len(),
320            "created Simplex BFT consensus engine"
321        );
322
323        Self {
324            context,
325            application,
326            application_mailbox,
327            buffer,
328            buffer_mailbox,
329            marshal,
330            marshal_mailbox,
331            consensus,
332        }
333    }
334
335    /// Starts the consensus engine.
336    ///
337    /// This method takes ownership of all the P2P channels and starts the
338    /// various actors that make up the consensus engine.
339    #[allow(clippy::too_many_arguments)]
340    pub fn start(
341        self,
342        pending: (
343            impl Sender<PublicKey = PublicKey>,
344            impl Receiver<PublicKey = PublicKey>,
345        ),
346        recovered: (
347            impl Sender<PublicKey = PublicKey>,
348            impl Receiver<PublicKey = PublicKey>,
349        ),
350        resolver: (
351            impl Sender<PublicKey = PublicKey>,
352            impl Receiver<PublicKey = PublicKey>,
353        ),
354        broadcast: (
355            impl Sender<PublicKey = PublicKey>,
356            impl Receiver<PublicKey = PublicKey>,
357        ),
358        marshal: (
359            mpsc::Receiver<handler::Message<SimplexBlock>>,
360            impl Resolver<Key = handler::Request<SimplexBlock>>,
361        ),
362    ) -> Handle<()> {
363        let context = self.context.clone();
364        context.spawn(move |_| async move {
365            self.run(pending, recovered, resolver, broadcast, marshal)
366                .await;
367        })
368    }
369
370    #[allow(clippy::too_many_arguments)]
371    async fn run(
372        self,
373        pending: (
374            impl Sender<PublicKey = PublicKey>,
375            impl Receiver<PublicKey = PublicKey>,
376        ),
377        recovered: (
378            impl Sender<PublicKey = PublicKey>,
379            impl Receiver<PublicKey = PublicKey>,
380        ),
381        resolver: (
382            impl Sender<PublicKey = PublicKey>,
383            impl Receiver<PublicKey = PublicKey>,
384        ),
385        broadcast: (
386            impl Sender<PublicKey = PublicKey>,
387            impl Receiver<PublicKey = PublicKey>,
388        ),
389        marshal: (
390            mpsc::Receiver<handler::Message<SimplexBlock>>,
391            impl Resolver<Key = handler::Request<SimplexBlock>>,
392        ),
393    ) {
394        // Start the application actor
395        let application_handle = self.context.spawn({
396            let application = self.application;
397            let marshal_mailbox = self.marshal_mailbox.clone();
398            move |_| async move {
399                application.run(marshal_mailbox).await;
400            }
401        });
402
403        // Start the buffer
404        let buffer_handle = self.buffer.start(broadcast);
405
406        // Start marshal
407        let marshal_handle =
408            self.marshal
409                .start(self.application_mailbox, self.buffer_mailbox, marshal);
410
411        // Start consensus
412        let consensus_handle = self.consensus.start(pending, recovered, resolver);
413
414        // Wait for any actor to finish
415        if let Err(e) = try_join_all(vec![
416            application_handle,
417            buffer_handle,
418            marshal_handle,
419            consensus_handle,
420        ])
421        .await
422        {
423            error!(?e, "consensus engine failed");
424        } else {
425            warn!("consensus engine stopped");
426        }
427    }
428}
429
430/// Metrics from the consensus engine.
431#[derive(Debug, Clone, Default)]
432pub struct EngineMetrics {
433    /// Current view number.
434    pub view: u64,
435    /// Last finalized height.
436    pub finalized_height: u64,
437    /// Number of pending transactions.
438    pub pending_transactions: usize,
439    /// Whether the engine is the current leader.
440    pub is_leader: bool,
441}