guts_node/
consensus_simplex.rs

1//! Simplex BFT consensus integration for guts-node.
2//!
3//! This module provides the integration between the HTTP API server and the
4//! real Simplex BFT consensus engine from commonware.
5//!
6//! # Architecture
7//!
8//! The consensus engine runs in its own commonware runtime context, while the
9//! HTTP server runs in the main Tokio runtime. They communicate via:
10//!
11//! - **Mempool**: Transactions are submitted via the HTTP API
12//! - **State**: Finalized blocks update shared application state
13//! - **P2P**: Consensus messages are exchanged via authenticated channels
14
15use crate::p2p::{parse_bootstrapper, parse_private_key, TokioContext};
16use commonware_cryptography::{ed25519::PublicKey, Signer};
17use commonware_runtime::{tokio as cw_tokio, Runner};
18use guts_consensus::simplex::{Config as SimplexConfig, Engine as SimplexEngine};
19use std::{net::SocketAddr, path::PathBuf};
20use tracing::{error, info, warn};
21
22/// Configuration for the Simplex BFT consensus.
23#[derive(Clone)]
24pub struct SimplexConsensusConfig {
25    /// Private key hex string for this validator.
26    pub private_key_hex: String,
27    /// P2P listen address.
28    pub p2p_addr: SocketAddr,
29    /// External P2P address (for NAT).
30    pub external_addr: Option<SocketAddr>,
31    /// Bootstrapper addresses (format: "pubkey@host:port").
32    pub bootstrappers: Vec<String>,
33    /// Participant public keys (hex strings).
34    pub participants: Vec<String>,
35    /// Storage directory.
36    pub data_dir: PathBuf,
37    /// Local mode (relaxed timing for development).
38    pub local: bool,
39    /// Mailbox size.
40    pub mailbox_size: usize,
41    /// Message backlog size.
42    pub message_backlog: usize,
43    /// Worker threads for the consensus runtime.
44    pub worker_threads: usize,
45}
46
47impl Default for SimplexConsensusConfig {
48    fn default() -> Self {
49        Self {
50            private_key_hex: String::new(),
51            p2p_addr: "0.0.0.0:9000".parse().unwrap(),
52            external_addr: None,
53            bootstrappers: Vec::new(),
54            participants: Vec::new(),
55            data_dir: PathBuf::from("./data"),
56            local: true,
57            mailbox_size: 1024,
58            message_backlog: 1024,
59            worker_threads: 4,
60        }
61    }
62}
63
64/// Handle to a running Simplex consensus engine.
65pub struct SimplexConsensusHandle {
66    /// The public key of this validator.
67    pub public_key: PublicKey,
68}
69
70/// Start the Simplex BFT consensus engine.
71///
72/// This spawns a new thread that runs the commonware runtime with the
73/// consensus engine. The engine will connect to other validators via P2P
74/// and participate in BFT consensus.
75///
76/// Returns a handle that can be used to interact with the consensus engine.
77pub fn start_simplex_consensus(
78    config: SimplexConsensusConfig,
79) -> Result<SimplexConsensusHandle, String> {
80    // Parse private key
81    let private_key = parse_private_key(&config.private_key_hex)?;
82    let public_key = private_key.public_key();
83
84    info!(
85        public_key = hex::encode(public_key.as_ref()),
86        p2p_addr = %config.p2p_addr,
87        "Starting Simplex BFT consensus"
88    );
89
90    // Parse participants
91    let mut participants: Vec<PublicKey> = Vec::new();
92    for pk_hex in &config.participants {
93        let pk = crate::p2p::parse_public_key(pk_hex)?;
94        participants.push(pk);
95    }
96
97    // Make sure we're in the participant list
98    if !participants.contains(&public_key) {
99        participants.push(public_key.clone());
100    }
101
102    info!(
103        participant_count = participants.len(),
104        "Parsed validator set"
105    );
106
107    // Parse bootstrappers
108    let mut bootstrappers = Vec::new();
109    for bs in &config.bootstrappers {
110        let (pk, addr) = parse_bootstrapper(bs)?;
111        bootstrappers.push((pk, addr));
112    }
113
114    // Clone config values for the thread
115    let p2p_addr = config.p2p_addr;
116    let external_addr = config.external_addr.unwrap_or(p2p_addr);
117    let data_dir = config.data_dir.clone();
118    let local = config.local;
119    let mailbox_size = config.mailbox_size;
120    let message_backlog = config.message_backlog;
121    let worker_threads = config.worker_threads;
122    let pk_clone = public_key.clone();
123
124    // Spawn the consensus engine in a separate thread with its own runtime
125    std::thread::spawn(move || {
126        // Configure commonware tokio runtime
127        let runtime_cfg = cw_tokio::Config::default()
128            .with_tcp_nodelay(Some(true))
129            .with_worker_threads(worker_threads)
130            .with_storage_directory(data_dir)
131            .with_catch_panics(false);
132
133        let executor = cw_tokio::Runner::new(runtime_cfg);
134
135        // Start the runtime
136        executor.start(move |context: TokioContext| async move {
137            run_consensus_engine(
138                context,
139                private_key,
140                participants,
141                bootstrappers,
142                p2p_addr,
143                external_addr,
144                local,
145                mailbox_size,
146                message_backlog,
147            )
148            .await;
149        });
150    });
151
152    Ok(SimplexConsensusHandle {
153        public_key: pk_clone,
154    })
155}
156
157/// Run the consensus engine (called within the commonware runtime).
158#[allow(clippy::too_many_arguments)]
159async fn run_consensus_engine(
160    context: TokioContext,
161    private_key: commonware_cryptography::ed25519::PrivateKey,
162    participants: Vec<PublicKey>,
163    bootstrappers: Vec<(PublicKey, SocketAddr)>,
164    p2p_addr: SocketAddr,
165    external_addr: SocketAddr,
166    local: bool,
167    mailbox_size: usize,
168    message_backlog: usize,
169) {
170    use crate::p2p::{AuthenticatedNetwork, AuthenticatedP2pConfig};
171    use futures::future::try_join_all;
172
173    let public_key = private_key.public_key();
174
175    info!(
176        public_key = hex::encode(public_key.as_ref()),
177        "Consensus engine starting"
178    );
179
180    // Create P2P config
181    let mut p2p_config = AuthenticatedP2pConfig::new(private_key.clone(), p2p_addr.port());
182    p2p_config.listen_addr = p2p_addr;
183    p2p_config.external_addr = external_addr;
184    p2p_config.bootstrappers = bootstrappers.clone();
185    p2p_config.mailbox_size = mailbox_size;
186    p2p_config.message_backlog = message_backlog;
187    p2p_config.local = local;
188
189    // Create the P2P network
190    let (network, channels, network_handle) =
191        AuthenticatedNetwork::new(context.clone(), p2p_config, participants.clone()).await;
192
193    info!(
194        public_key = hex::encode(network.public_key.as_ref()),
195        "P2P network initialized"
196    );
197
198    // Create the simplex engine config
199    let simplex_config = SimplexConfig::new(
200        network.oracle.clone(),
201        public_key.clone(),
202        private_key,
203        participants,
204    );
205
206    // Create the simplex engine
207    let simplex_engine = SimplexEngine::new(context.clone(), simplex_config).await;
208
209    info!("Simplex BFT engine created, starting...");
210
211    // Start the simplex engine with P2P channels
212    let engine_handle = simplex_engine.start(
213        channels.pending,
214        channels.recovered,
215        channels.resolver,
216        channels.broadcast,
217        channels.marshal,
218    );
219
220    info!("Simplex BFT consensus running");
221
222    // Wait for any task to complete (they should run forever)
223    if let Err(e) = try_join_all(vec![network_handle, engine_handle]).await {
224        error!(?e, "Consensus engine task failed");
225    } else {
226        warn!("Consensus engine stopped unexpectedly");
227    }
228}
229
230#[cfg(test)]
231mod tests {
232    use super::*;
233
234    #[test]
235    fn test_default_config() {
236        let config = SimplexConsensusConfig::default();
237        assert!(config.local);
238        assert_eq!(config.worker_threads, 4);
239    }
240}