1use crate::block::{Block, BlockId, FinalizedBlock};
7use crate::error::{ConsensusError, Result};
8use crate::mempool::Mempool;
9use crate::transaction::{
10 SerializablePublicKey, SerializableSignature, Transaction, TransactionId,
11};
12use crate::validator::ValidatorSet;
13use async_trait::async_trait;
14use commonware_cryptography::Signer;
15use parking_lot::RwLock;
16use std::collections::HashMap;
17use std::sync::Arc;
18use std::time::Duration;
19use tokio::sync::{broadcast, mpsc};
20
21#[derive(Debug, Clone)]
23pub struct EngineConfig {
24 pub block_time: Duration,
26
27 pub max_txs_per_block: usize,
29
30 pub max_block_size: usize,
32
33 pub view_timeout_multiplier: f64,
35
36 pub consensus_enabled: bool,
38}
39
40impl Default for EngineConfig {
41 fn default() -> Self {
42 Self {
43 block_time: Duration::from_millis(2000),
44 max_txs_per_block: 1000,
45 max_block_size: 10 * 1024 * 1024,
46 view_timeout_multiplier: 2.0,
47 consensus_enabled: true,
48 }
49 }
50}
51
52#[derive(Debug, Clone, Copy, PartialEq, Eq)]
54pub enum EngineState {
55 Starting,
57 Syncing,
59 Active,
61 Following,
63 Stopped,
65}
66
67#[derive(Debug, Clone)]
69pub enum ConsensusEvent {
70 BlockProposed {
72 height: u64,
73 producer: SerializablePublicKey,
74 tx_count: usize,
75 },
76 BlockFinalized {
78 height: u64,
79 block_id: BlockId,
80 tx_count: usize,
81 },
82 ViewChanged {
84 view: u64,
85 leader: SerializablePublicKey,
86 },
87 StateChanged { old: EngineState, new: EngineState },
89 TransactionIncluded {
91 tx_id: TransactionId,
92 block_height: u64,
93 },
94}
95
96#[async_trait]
101pub trait ConsensusApplication: Send + Sync {
102 async fn on_block_finalized(&self, block: &FinalizedBlock) -> Result<()>;
104
105 async fn compute_state_root(&self, transactions: &[Transaction]) -> Result<[u8; 32]>;
107
108 async fn verify_transaction(&self, transaction: &Transaction) -> Result<()>;
110
111 fn current_height(&self) -> u64;
113}
114
115pub struct ConsensusEngine {
117 config: EngineConfig,
119
120 validator_key: Option<commonware_cryptography::ed25519::PrivateKey>,
122
123 validators: Arc<RwLock<ValidatorSet>>,
125
126 mempool: Arc<Mempool>,
128
129 blocks: Arc<RwLock<HashMap<u64, FinalizedBlock>>>,
131
132 state: Arc<RwLock<EngineState>>,
134
135 view: Arc<RwLock<u64>>,
137
138 finalized_height: Arc<RwLock<u64>>,
140
141 events: broadcast::Sender<ConsensusEvent>,
143
144 tx_sender: mpsc::Sender<Transaction>,
146
147 tx_receiver: Arc<RwLock<Option<mpsc::Receiver<Transaction>>>>,
149}
150
151impl ConsensusEngine {
152 pub fn new(
154 config: EngineConfig,
155 validator_key: Option<commonware_cryptography::ed25519::PrivateKey>,
156 validators: ValidatorSet,
157 mempool: Arc<Mempool>,
158 ) -> Self {
159 let (events, _) = broadcast::channel(1024);
160 let (tx_sender, tx_receiver) = mpsc::channel(10_000);
161
162 Self {
163 config,
164 validator_key,
165 validators: Arc::new(RwLock::new(validators)),
166 mempool,
167 blocks: Arc::new(RwLock::new(HashMap::new())),
168 state: Arc::new(RwLock::new(EngineState::Starting)),
169 view: Arc::new(RwLock::new(0)),
170 finalized_height: Arc::new(RwLock::new(0)),
171 events,
172 tx_sender,
173 tx_receiver: Arc::new(RwLock::new(Some(tx_receiver))),
174 }
175 }
176
177 pub fn transaction_sender(&self) -> mpsc::Sender<Transaction> {
179 self.tx_sender.clone()
180 }
181
182 pub fn subscribe(&self) -> broadcast::Receiver<ConsensusEvent> {
184 self.events.subscribe()
185 }
186
187 pub fn state(&self) -> EngineState {
189 *self.state.read()
190 }
191
192 pub fn view(&self) -> u64 {
194 *self.view.read()
195 }
196
197 pub fn finalized_height(&self) -> u64 {
199 *self.finalized_height.read()
200 }
201
202 pub fn is_leader(&self) -> bool {
204 if let Some(ref key) = self.validator_key {
205 let view = *self.view.read();
206 let validators = self.validators.read();
207 if let Some(leader) = validators.leader_for_view(view) {
208 let our_pubkey = SerializablePublicKey::from_pubkey(&key.public_key());
209 return leader.pubkey == our_pubkey;
210 }
211 }
212 false
213 }
214
215 pub fn current_leader(&self) -> Option<SerializablePublicKey> {
217 let view = *self.view.read();
218 let validators = self.validators.read();
219 validators.leader_for_view(view).map(|v| v.pubkey.clone())
220 }
221
222 pub async fn submit_transaction(&self, tx: Transaction) -> Result<TransactionId> {
224 let id = self.mempool.add(tx.clone())?;
226
227 self.tx_sender
229 .send(tx)
230 .await
231 .map_err(|e| ConsensusError::EngineError(e.to_string()))?;
232
233 Ok(id)
234 }
235
236 pub fn get_block(&self, height: u64) -> Option<FinalizedBlock> {
238 self.blocks.read().get(&height).cloned()
239 }
240
241 pub fn validators(&self) -> Arc<RwLock<ValidatorSet>> {
243 self.validators.clone()
244 }
245
246 pub async fn run<A: ConsensusApplication>(&self, app: Arc<A>) -> Result<()> {
250 self.set_state(EngineState::Active);
251
252 let mut tx_receiver = self
254 .tx_receiver
255 .write()
256 .take()
257 .ok_or_else(|| ConsensusError::EngineError("engine already running".into()))?;
258
259 let block_time = self.config.block_time;
260 let mut block_interval = tokio::time::interval(block_time);
261
262 loop {
263 tokio::select! {
264 Some(tx) = tx_receiver.recv() => {
266 self.handle_transaction(tx, &app).await?;
267 }
268
269 _ = block_interval.tick() => {
271 if self.is_leader() && self.config.consensus_enabled {
272 self.propose_block(&app).await?;
273 }
274 }
275 }
276 }
277 }
278
279 async fn handle_transaction<A: ConsensusApplication>(
281 &self,
282 tx: Transaction,
283 app: &Arc<A>,
284 ) -> Result<()> {
285 app.verify_transaction(&tx).await?;
287
288 tracing::debug!(
289 tx_id = %tx.id(),
290 kind = tx.kind(),
291 "transaction verified and pending"
292 );
293
294 Ok(())
295 }
296
297 async fn propose_block<A: ConsensusApplication>(&self, app: &Arc<A>) -> Result<()> {
299 let validator_key = self
300 .validator_key
301 .as_ref()
302 .ok_or_else(|| ConsensusError::EngineError("not a validator".into()))?;
303
304 let transactions = self.mempool.get_for_proposal();
306
307 let height = app.current_height() + 1;
308 let parent = self
309 .blocks
310 .read()
311 .get(&(height - 1))
312 .map(|b| b.id())
313 .unwrap_or(BlockId::GENESIS_PARENT);
314
315 let state_root = app.compute_state_root(&transactions).await?;
317
318 let timestamp = std::time::SystemTime::now()
320 .duration_since(std::time::UNIX_EPOCH)
321 .unwrap()
322 .as_millis() as u64;
323
324 let producer = SerializablePublicKey::from_pubkey(&validator_key.public_key());
325
326 let block = Block::new(
327 height,
328 parent,
329 producer.clone(),
330 timestamp,
331 transactions.clone(),
332 state_root,
333 );
334
335 let tx_count = block.tx_count();
336
337 let _ = self.events.send(ConsensusEvent::BlockProposed {
339 height,
340 producer,
341 tx_count,
342 });
343
344 tracing::info!(
345 height,
346 tx_count,
347 block_id = %block.id(),
348 "proposed block"
349 );
350
351 let view = *self.view.read();
356 self.finalize_block(block, view, vec![], app).await?;
357
358 Ok(())
359 }
360
361 async fn finalize_block<A: ConsensusApplication>(
363 &self,
364 block: Block,
365 view: u64,
366 signatures: Vec<(SerializablePublicKey, SerializableSignature)>,
367 app: &Arc<A>,
368 ) -> Result<()> {
369 let height = block.height();
370 let block_id = block.id();
371 let tx_count = block.tx_count();
372
373 let finalized = FinalizedBlock::new(block, view, signatures);
375
376 self.blocks.write().insert(height, finalized.clone());
378
379 *self.finalized_height.write() = height;
381
382 let tx_ids: Vec<_> = finalized
384 .block
385 .transactions
386 .iter()
387 .map(|tx| tx.id())
388 .collect();
389 self.mempool.remove_batch(&tx_ids);
390
391 app.on_block_finalized(&finalized).await?;
393
394 let _ = self.events.send(ConsensusEvent::BlockFinalized {
396 height,
397 block_id,
398 tx_count,
399 });
400
401 for tx in &finalized.block.transactions {
402 let _ = self.events.send(ConsensusEvent::TransactionIncluded {
403 tx_id: tx.id(),
404 block_height: height,
405 });
406 }
407
408 tracing::info!(
409 height,
410 tx_count,
411 block_id = %block_id,
412 "finalized block"
413 );
414
415 Ok(())
416 }
417
418 fn set_state(&self, new_state: EngineState) {
420 let old_state = {
421 let mut state = self.state.write();
422 let old = *state;
423 *state = new_state;
424 old
425 };
426
427 if old_state != new_state {
428 let _ = self.events.send(ConsensusEvent::StateChanged {
429 old: old_state,
430 new: new_state,
431 });
432 }
433 }
434
435 pub fn advance_view(&self) {
437 let new_view = {
438 let mut view = self.view.write();
439 *view += 1;
440 *view
441 };
442
443 if let Some(leader) = self.current_leader() {
444 let _ = self.events.send(ConsensusEvent::ViewChanged {
445 view: new_view,
446 leader,
447 });
448 }
449
450 tracing::debug!(view = new_view, "advanced to new view");
451 }
452
453 pub fn stop(&self) {
455 self.set_state(EngineState::Stopped);
456 }
457}
458
459pub struct NoOpApplication {
461 height: RwLock<u64>,
462}
463
464impl NoOpApplication {
465 pub fn new() -> Self {
467 Self {
468 height: RwLock::new(0),
469 }
470 }
471}
472
473impl Default for NoOpApplication {
474 fn default() -> Self {
475 Self::new()
476 }
477}
478
479#[async_trait]
480impl ConsensusApplication for NoOpApplication {
481 async fn on_block_finalized(&self, block: &FinalizedBlock) -> Result<()> {
482 *self.height.write() = block.height();
483 Ok(())
484 }
485
486 async fn compute_state_root(&self, _transactions: &[Transaction]) -> Result<[u8; 32]> {
487 Ok([0u8; 32])
488 }
489
490 async fn verify_transaction(&self, _transaction: &Transaction) -> Result<()> {
491 Ok(())
492 }
493
494 fn current_height(&self) -> u64 {
495 *self.height.read()
496 }
497}
498
499#[cfg(test)]
500mod tests {
501 use super::*;
502 use crate::genesis::generate_devnet_genesis;
503 use crate::mempool::MempoolConfig;
504 use commonware_cryptography::{PrivateKeyExt, Signer};
505
506 fn test_tx(seed: u64) -> Transaction {
507 use commonware_cryptography::ed25519;
508
509 let key = ed25519::PrivateKey::from_seed(seed);
510 let sig = key.sign(Some(b"_GUTS"), b"test");
511
512 Transaction::CreateRepository {
513 owner: "alice".into(),
514 name: format!("repo-{}", seed),
515 description: "A test".into(),
516 default_branch: "main".into(),
517 visibility: "public".into(),
518 creator: SerializablePublicKey::from_pubkey(&key.public_key()),
519 signature: SerializableSignature::from_signature(&sig),
520 }
521 }
522
523 #[tokio::test]
524 async fn test_engine_creation() {
525 let genesis = generate_devnet_genesis(4);
526 let validators = genesis.into_validator_set().unwrap();
527 let mempool = Arc::new(Mempool::new(MempoolConfig::default()));
528
529 let config = EngineConfig {
530 consensus_enabled: false,
531 ..Default::default()
532 };
533
534 let key = commonware_cryptography::ed25519::PrivateKey::from_seed(0);
535 let engine = ConsensusEngine::new(config, Some(key), validators, mempool);
536
537 assert_eq!(engine.state(), EngineState::Starting);
538 assert_eq!(engine.view(), 0);
539 }
540
541 #[tokio::test]
542 async fn test_transaction_submission() {
543 let genesis = generate_devnet_genesis(4);
544 let validators = genesis.into_validator_set().unwrap();
545 let mempool = Arc::new(Mempool::new(MempoolConfig::default()));
546
547 let config = EngineConfig {
548 consensus_enabled: false,
549 ..Default::default()
550 };
551
552 let key = commonware_cryptography::ed25519::PrivateKey::from_seed(0);
553 let engine = ConsensusEngine::new(config, Some(key), validators, mempool.clone());
554
555 let tx = test_tx(1);
556 let id = engine.submit_transaction(tx).await.unwrap();
557
558 assert!(mempool.contains(&id));
559 }
560
561 #[tokio::test]
562 async fn test_leader_rotation() {
563 let genesis = generate_devnet_genesis(4);
564 let validators = genesis.into_validator_set().unwrap();
565 let mempool = Arc::new(Mempool::new(MempoolConfig::default()));
566
567 let config = EngineConfig::default();
568 let key = commonware_cryptography::ed25519::PrivateKey::from_seed(0);
569 let engine = ConsensusEngine::new(config, Some(key), validators, mempool);
570
571 assert!(engine.is_leader());
573
574 engine.advance_view();
576 assert_eq!(engine.view(), 1);
577
578 assert!(!engine.is_leader());
580 }
581}