guts_consensus/
engine.rs

1//! Consensus engine implementation.
2//!
3//! This module provides the core consensus engine that integrates with
4//! commonware-consensus for BFT agreement.
5
6use crate::block::{Block, BlockId, FinalizedBlock};
7use crate::error::{ConsensusError, Result};
8use crate::mempool::Mempool;
9use crate::transaction::{
10    SerializablePublicKey, SerializableSignature, Transaction, TransactionId,
11};
12use crate::validator::ValidatorSet;
13use async_trait::async_trait;
14use commonware_cryptography::Signer;
15use parking_lot::RwLock;
16use std::collections::HashMap;
17use std::sync::Arc;
18use std::time::Duration;
19use tokio::sync::{broadcast, mpsc};
20
21/// Configuration for the consensus engine.
22#[derive(Debug, Clone)]
23pub struct EngineConfig {
24    /// Target block time.
25    pub block_time: Duration,
26
27    /// Maximum transactions per block.
28    pub max_txs_per_block: usize,
29
30    /// Maximum block size in bytes.
31    pub max_block_size: usize,
32
33    /// View timeout multiplier.
34    pub view_timeout_multiplier: f64,
35
36    /// Enable consensus (false for single-node mode).
37    pub consensus_enabled: bool,
38}
39
40impl Default for EngineConfig {
41    fn default() -> Self {
42        Self {
43            block_time: Duration::from_millis(2000),
44            max_txs_per_block: 1000,
45            max_block_size: 10 * 1024 * 1024,
46            view_timeout_multiplier: 2.0,
47            consensus_enabled: true,
48        }
49    }
50}
51
52/// State of the consensus engine.
53#[derive(Debug, Clone, Copy, PartialEq, Eq)]
54pub enum EngineState {
55    /// Engine is starting up.
56    Starting,
57    /// Engine is syncing from peers.
58    Syncing,
59    /// Engine is actively participating in consensus.
60    Active,
61    /// Engine is a follower (non-validator).
62    Following,
63    /// Engine is stopped.
64    Stopped,
65}
66
67/// Events emitted by the consensus engine.
68#[derive(Debug, Clone)]
69pub enum ConsensusEvent {
70    /// A new block was proposed.
71    BlockProposed {
72        height: u64,
73        producer: SerializablePublicKey,
74        tx_count: usize,
75    },
76    /// A block was finalized.
77    BlockFinalized {
78        height: u64,
79        block_id: BlockId,
80        tx_count: usize,
81    },
82    /// View changed (new leader).
83    ViewChanged {
84        view: u64,
85        leader: SerializablePublicKey,
86    },
87    /// Consensus state changed.
88    StateChanged { old: EngineState, new: EngineState },
89    /// Transaction was included in a block.
90    TransactionIncluded {
91        tx_id: TransactionId,
92        block_height: u64,
93    },
94}
95
96/// Application interface for the consensus engine.
97///
98/// This trait is implemented by the Guts node to handle finalized blocks
99/// and provide state roots.
100#[async_trait]
101pub trait ConsensusApplication: Send + Sync {
102    /// Called when a block is finalized.
103    async fn on_block_finalized(&self, block: &FinalizedBlock) -> Result<()>;
104
105    /// Computes the state root after applying transactions.
106    async fn compute_state_root(&self, transactions: &[Transaction]) -> Result<[u8; 32]>;
107
108    /// Verifies that a transaction is valid for inclusion.
109    async fn verify_transaction(&self, transaction: &Transaction) -> Result<()>;
110
111    /// Gets the current height.
112    fn current_height(&self) -> u64;
113}
114
115/// The consensus engine.
116pub struct ConsensusEngine {
117    /// Engine configuration.
118    config: EngineConfig,
119
120    /// Our validator key (if we are a validator).
121    validator_key: Option<commonware_cryptography::ed25519::PrivateKey>,
122
123    /// Validator set.
124    validators: Arc<RwLock<ValidatorSet>>,
125
126    /// Transaction mempool.
127    mempool: Arc<Mempool>,
128
129    /// Finalized blocks by height.
130    blocks: Arc<RwLock<HashMap<u64, FinalizedBlock>>>,
131
132    /// Current engine state.
133    state: Arc<RwLock<EngineState>>,
134
135    /// Current view number.
136    view: Arc<RwLock<u64>>,
137
138    /// Latest finalized height.
139    finalized_height: Arc<RwLock<u64>>,
140
141    /// Event broadcaster.
142    events: broadcast::Sender<ConsensusEvent>,
143
144    /// Transaction submission channel.
145    tx_sender: mpsc::Sender<Transaction>,
146
147    /// Transaction receiver (owned by engine runner).
148    tx_receiver: Arc<RwLock<Option<mpsc::Receiver<Transaction>>>>,
149}
150
151impl ConsensusEngine {
152    /// Creates a new consensus engine.
153    pub fn new(
154        config: EngineConfig,
155        validator_key: Option<commonware_cryptography::ed25519::PrivateKey>,
156        validators: ValidatorSet,
157        mempool: Arc<Mempool>,
158    ) -> Self {
159        let (events, _) = broadcast::channel(1024);
160        let (tx_sender, tx_receiver) = mpsc::channel(10_000);
161
162        Self {
163            config,
164            validator_key,
165            validators: Arc::new(RwLock::new(validators)),
166            mempool,
167            blocks: Arc::new(RwLock::new(HashMap::new())),
168            state: Arc::new(RwLock::new(EngineState::Starting)),
169            view: Arc::new(RwLock::new(0)),
170            finalized_height: Arc::new(RwLock::new(0)),
171            events,
172            tx_sender,
173            tx_receiver: Arc::new(RwLock::new(Some(tx_receiver))),
174        }
175    }
176
177    /// Returns a handle for submitting transactions.
178    pub fn transaction_sender(&self) -> mpsc::Sender<Transaction> {
179        self.tx_sender.clone()
180    }
181
182    /// Subscribes to consensus events.
183    pub fn subscribe(&self) -> broadcast::Receiver<ConsensusEvent> {
184        self.events.subscribe()
185    }
186
187    /// Returns the current engine state.
188    pub fn state(&self) -> EngineState {
189        *self.state.read()
190    }
191
192    /// Returns the current view.
193    pub fn view(&self) -> u64 {
194        *self.view.read()
195    }
196
197    /// Returns the latest finalized height.
198    pub fn finalized_height(&self) -> u64 {
199        *self.finalized_height.read()
200    }
201
202    /// Checks if we are the leader for the current view.
203    pub fn is_leader(&self) -> bool {
204        if let Some(ref key) = self.validator_key {
205            let view = *self.view.read();
206            let validators = self.validators.read();
207            if let Some(leader) = validators.leader_for_view(view) {
208                let our_pubkey = SerializablePublicKey::from_pubkey(&key.public_key());
209                return leader.pubkey == our_pubkey;
210            }
211        }
212        false
213    }
214
215    /// Gets the current leader.
216    pub fn current_leader(&self) -> Option<SerializablePublicKey> {
217        let view = *self.view.read();
218        let validators = self.validators.read();
219        validators.leader_for_view(view).map(|v| v.pubkey.clone())
220    }
221
222    /// Submits a transaction to the mempool.
223    pub async fn submit_transaction(&self, tx: Transaction) -> Result<TransactionId> {
224        // Add to mempool
225        let id = self.mempool.add(tx.clone())?;
226
227        // Send to engine for processing
228        self.tx_sender
229            .send(tx)
230            .await
231            .map_err(|e| ConsensusError::EngineError(e.to_string()))?;
232
233        Ok(id)
234    }
235
236    /// Gets a finalized block by height.
237    pub fn get_block(&self, height: u64) -> Option<FinalizedBlock> {
238        self.blocks.read().get(&height).cloned()
239    }
240
241    /// Gets the validator set.
242    pub fn validators(&self) -> Arc<RwLock<ValidatorSet>> {
243        self.validators.clone()
244    }
245
246    /// Runs the consensus engine.
247    ///
248    /// This is the main event loop that drives consensus.
249    pub async fn run<A: ConsensusApplication>(&self, app: Arc<A>) -> Result<()> {
250        self.set_state(EngineState::Active);
251
252        // Take ownership of the transaction receiver
253        let mut tx_receiver = self
254            .tx_receiver
255            .write()
256            .take()
257            .ok_or_else(|| ConsensusError::EngineError("engine already running".into()))?;
258
259        let block_time = self.config.block_time;
260        let mut block_interval = tokio::time::interval(block_time);
261
262        loop {
263            tokio::select! {
264                // Receive transactions
265                Some(tx) = tx_receiver.recv() => {
266                    self.handle_transaction(tx, &app).await?;
267                }
268
269                // Block proposal timer
270                _ = block_interval.tick() => {
271                    if self.is_leader() && self.config.consensus_enabled {
272                        self.propose_block(&app).await?;
273                    }
274                }
275            }
276        }
277    }
278
279    /// Handles an incoming transaction.
280    async fn handle_transaction<A: ConsensusApplication>(
281        &self,
282        tx: Transaction,
283        app: &Arc<A>,
284    ) -> Result<()> {
285        // Verify the transaction is valid
286        app.verify_transaction(&tx).await?;
287
288        tracing::debug!(
289            tx_id = %tx.id(),
290            kind = tx.kind(),
291            "transaction verified and pending"
292        );
293
294        Ok(())
295    }
296
297    /// Proposes a new block (called when we are the leader).
298    async fn propose_block<A: ConsensusApplication>(&self, app: &Arc<A>) -> Result<()> {
299        let validator_key = self
300            .validator_key
301            .as_ref()
302            .ok_or_else(|| ConsensusError::EngineError("not a validator".into()))?;
303
304        // Get transactions from mempool
305        let transactions = self.mempool.get_for_proposal();
306
307        let height = app.current_height() + 1;
308        let parent = self
309            .blocks
310            .read()
311            .get(&(height - 1))
312            .map(|b| b.id())
313            .unwrap_or(BlockId::GENESIS_PARENT);
314
315        // Compute state root
316        let state_root = app.compute_state_root(&transactions).await?;
317
318        // Create the block
319        let timestamp = std::time::SystemTime::now()
320            .duration_since(std::time::UNIX_EPOCH)
321            .unwrap()
322            .as_millis() as u64;
323
324        let producer = SerializablePublicKey::from_pubkey(&validator_key.public_key());
325
326        let block = Block::new(
327            height,
328            parent,
329            producer.clone(),
330            timestamp,
331            transactions.clone(),
332            state_root,
333        );
334
335        let tx_count = block.tx_count();
336
337        // Emit block proposed event
338        let _ = self.events.send(ConsensusEvent::BlockProposed {
339            height,
340            producer,
341            tx_count,
342        });
343
344        tracing::info!(
345            height,
346            tx_count,
347            block_id = %block.id(),
348            "proposed block"
349        );
350
351        // Finalize the block
352        // In multi-validator mode, this should wait for quorum signatures via P2P.
353        // For now, we simulate immediate finalization (local consensus simulation).
354        // TODO: Implement proper P2P consensus message exchange with commonware.
355        let view = *self.view.read();
356        self.finalize_block(block, view, vec![], app).await?;
357
358        Ok(())
359    }
360
361    /// Finalizes a block after consensus is reached.
362    async fn finalize_block<A: ConsensusApplication>(
363        &self,
364        block: Block,
365        view: u64,
366        signatures: Vec<(SerializablePublicKey, SerializableSignature)>,
367        app: &Arc<A>,
368    ) -> Result<()> {
369        let height = block.height();
370        let block_id = block.id();
371        let tx_count = block.tx_count();
372
373        // Create finalized block
374        let finalized = FinalizedBlock::new(block, view, signatures);
375
376        // Store the block
377        self.blocks.write().insert(height, finalized.clone());
378
379        // Update finalized height
380        *self.finalized_height.write() = height;
381
382        // Remove transactions from mempool
383        let tx_ids: Vec<_> = finalized
384            .block
385            .transactions
386            .iter()
387            .map(|tx| tx.id())
388            .collect();
389        self.mempool.remove_batch(&tx_ids);
390
391        // Notify application
392        app.on_block_finalized(&finalized).await?;
393
394        // Emit events
395        let _ = self.events.send(ConsensusEvent::BlockFinalized {
396            height,
397            block_id,
398            tx_count,
399        });
400
401        for tx in &finalized.block.transactions {
402            let _ = self.events.send(ConsensusEvent::TransactionIncluded {
403                tx_id: tx.id(),
404                block_height: height,
405            });
406        }
407
408        tracing::info!(
409            height,
410            tx_count,
411            block_id = %block_id,
412            "finalized block"
413        );
414
415        Ok(())
416    }
417
418    /// Sets the engine state and emits an event.
419    fn set_state(&self, new_state: EngineState) {
420        let old_state = {
421            let mut state = self.state.write();
422            let old = *state;
423            *state = new_state;
424            old
425        };
426
427        if old_state != new_state {
428            let _ = self.events.send(ConsensusEvent::StateChanged {
429                old: old_state,
430                new: new_state,
431            });
432        }
433    }
434
435    /// Advances to the next view.
436    pub fn advance_view(&self) {
437        let new_view = {
438            let mut view = self.view.write();
439            *view += 1;
440            *view
441        };
442
443        if let Some(leader) = self.current_leader() {
444            let _ = self.events.send(ConsensusEvent::ViewChanged {
445                view: new_view,
446                leader,
447            });
448        }
449
450        tracing::debug!(view = new_view, "advanced to new view");
451    }
452
453    /// Stops the engine.
454    pub fn stop(&self) {
455        self.set_state(EngineState::Stopped);
456    }
457}
458
459/// A no-op application for testing.
460pub struct NoOpApplication {
461    height: RwLock<u64>,
462}
463
464impl NoOpApplication {
465    /// Creates a new no-op application.
466    pub fn new() -> Self {
467        Self {
468            height: RwLock::new(0),
469        }
470    }
471}
472
473impl Default for NoOpApplication {
474    fn default() -> Self {
475        Self::new()
476    }
477}
478
479#[async_trait]
480impl ConsensusApplication for NoOpApplication {
481    async fn on_block_finalized(&self, block: &FinalizedBlock) -> Result<()> {
482        *self.height.write() = block.height();
483        Ok(())
484    }
485
486    async fn compute_state_root(&self, _transactions: &[Transaction]) -> Result<[u8; 32]> {
487        Ok([0u8; 32])
488    }
489
490    async fn verify_transaction(&self, _transaction: &Transaction) -> Result<()> {
491        Ok(())
492    }
493
494    fn current_height(&self) -> u64 {
495        *self.height.read()
496    }
497}
498
499#[cfg(test)]
500mod tests {
501    use super::*;
502    use crate::genesis::generate_devnet_genesis;
503    use crate::mempool::MempoolConfig;
504    use commonware_cryptography::{PrivateKeyExt, Signer};
505
506    fn test_tx(seed: u64) -> Transaction {
507        use commonware_cryptography::ed25519;
508
509        let key = ed25519::PrivateKey::from_seed(seed);
510        let sig = key.sign(Some(b"_GUTS"), b"test");
511
512        Transaction::CreateRepository {
513            owner: "alice".into(),
514            name: format!("repo-{}", seed),
515            description: "A test".into(),
516            default_branch: "main".into(),
517            visibility: "public".into(),
518            creator: SerializablePublicKey::from_pubkey(&key.public_key()),
519            signature: SerializableSignature::from_signature(&sig),
520        }
521    }
522
523    #[tokio::test]
524    async fn test_engine_creation() {
525        let genesis = generate_devnet_genesis(4);
526        let validators = genesis.into_validator_set().unwrap();
527        let mempool = Arc::new(Mempool::new(MempoolConfig::default()));
528
529        let config = EngineConfig {
530            consensus_enabled: false,
531            ..Default::default()
532        };
533
534        let key = commonware_cryptography::ed25519::PrivateKey::from_seed(0);
535        let engine = ConsensusEngine::new(config, Some(key), validators, mempool);
536
537        assert_eq!(engine.state(), EngineState::Starting);
538        assert_eq!(engine.view(), 0);
539    }
540
541    #[tokio::test]
542    async fn test_transaction_submission() {
543        let genesis = generate_devnet_genesis(4);
544        let validators = genesis.into_validator_set().unwrap();
545        let mempool = Arc::new(Mempool::new(MempoolConfig::default()));
546
547        let config = EngineConfig {
548            consensus_enabled: false,
549            ..Default::default()
550        };
551
552        let key = commonware_cryptography::ed25519::PrivateKey::from_seed(0);
553        let engine = ConsensusEngine::new(config, Some(key), validators, mempool.clone());
554
555        let tx = test_tx(1);
556        let id = engine.submit_transaction(tx).await.unwrap();
557
558        assert!(mempool.contains(&id));
559    }
560
561    #[tokio::test]
562    async fn test_leader_rotation() {
563        let genesis = generate_devnet_genesis(4);
564        let validators = genesis.into_validator_set().unwrap();
565        let mempool = Arc::new(Mempool::new(MempoolConfig::default()));
566
567        let config = EngineConfig::default();
568        let key = commonware_cryptography::ed25519::PrivateKey::from_seed(0);
569        let engine = ConsensusEngine::new(config, Some(key), validators, mempool);
570
571        // View 0: validator 0 should be leader
572        assert!(engine.is_leader());
573
574        // Advance view
575        engine.advance_view();
576        assert_eq!(engine.view(), 1);
577
578        // Now validator 1 should be leader (we are validator 0)
579        assert!(!engine.is_leader());
580    }
581}