guts_node/
consensus_simplex.rs1use crate::p2p::{parse_bootstrapper, parse_private_key, TokioContext};
16use commonware_cryptography::{ed25519::PublicKey, Signer};
17use commonware_runtime::{tokio as cw_tokio, Runner};
18use guts_consensus::simplex::{Config as SimplexConfig, Engine as SimplexEngine};
19use std::{net::SocketAddr, path::PathBuf};
20use tracing::{error, info, warn};
21
22#[derive(Clone)]
24pub struct SimplexConsensusConfig {
25 pub private_key_hex: String,
27 pub p2p_addr: SocketAddr,
29 pub external_addr: Option<SocketAddr>,
31 pub bootstrappers: Vec<String>,
33 pub participants: Vec<String>,
35 pub data_dir: PathBuf,
37 pub local: bool,
39 pub mailbox_size: usize,
41 pub message_backlog: usize,
43 pub worker_threads: usize,
45}
46
47impl Default for SimplexConsensusConfig {
48 fn default() -> Self {
49 Self {
50 private_key_hex: String::new(),
51 p2p_addr: "0.0.0.0:9000".parse().unwrap(),
52 external_addr: None,
53 bootstrappers: Vec::new(),
54 participants: Vec::new(),
55 data_dir: PathBuf::from("./data"),
56 local: true,
57 mailbox_size: 1024,
58 message_backlog: 1024,
59 worker_threads: 4,
60 }
61 }
62}
63
64pub struct SimplexConsensusHandle {
66 pub public_key: PublicKey,
68}
69
70pub fn start_simplex_consensus(
78 config: SimplexConsensusConfig,
79) -> Result<SimplexConsensusHandle, String> {
80 let private_key = parse_private_key(&config.private_key_hex)?;
82 let public_key = private_key.public_key();
83
84 info!(
85 public_key = hex::encode(public_key.as_ref()),
86 p2p_addr = %config.p2p_addr,
87 "Starting Simplex BFT consensus"
88 );
89
90 let mut participants: Vec<PublicKey> = Vec::new();
92 for pk_hex in &config.participants {
93 let pk = crate::p2p::parse_public_key(pk_hex)?;
94 participants.push(pk);
95 }
96
97 if !participants.contains(&public_key) {
99 participants.push(public_key.clone());
100 }
101
102 info!(
103 participant_count = participants.len(),
104 "Parsed validator set"
105 );
106
107 let mut bootstrappers = Vec::new();
109 for bs in &config.bootstrappers {
110 let (pk, addr) = parse_bootstrapper(bs)?;
111 bootstrappers.push((pk, addr));
112 }
113
114 let p2p_addr = config.p2p_addr;
116 let external_addr = config.external_addr.unwrap_or(p2p_addr);
117 let data_dir = config.data_dir.clone();
118 let local = config.local;
119 let mailbox_size = config.mailbox_size;
120 let message_backlog = config.message_backlog;
121 let worker_threads = config.worker_threads;
122 let pk_clone = public_key.clone();
123
124 std::thread::spawn(move || {
126 let runtime_cfg = cw_tokio::Config::default()
128 .with_tcp_nodelay(Some(true))
129 .with_worker_threads(worker_threads)
130 .with_storage_directory(data_dir)
131 .with_catch_panics(false);
132
133 let executor = cw_tokio::Runner::new(runtime_cfg);
134
135 executor.start(move |context: TokioContext| async move {
137 run_consensus_engine(
138 context,
139 private_key,
140 participants,
141 bootstrappers,
142 p2p_addr,
143 external_addr,
144 local,
145 mailbox_size,
146 message_backlog,
147 )
148 .await;
149 });
150 });
151
152 Ok(SimplexConsensusHandle {
153 public_key: pk_clone,
154 })
155}
156
157#[allow(clippy::too_many_arguments)]
159async fn run_consensus_engine(
160 context: TokioContext,
161 private_key: commonware_cryptography::ed25519::PrivateKey,
162 participants: Vec<PublicKey>,
163 bootstrappers: Vec<(PublicKey, SocketAddr)>,
164 p2p_addr: SocketAddr,
165 external_addr: SocketAddr,
166 local: bool,
167 mailbox_size: usize,
168 message_backlog: usize,
169) {
170 use crate::p2p::{AuthenticatedNetwork, AuthenticatedP2pConfig};
171 use futures::future::try_join_all;
172
173 let public_key = private_key.public_key();
174
175 info!(
176 public_key = hex::encode(public_key.as_ref()),
177 "Consensus engine starting"
178 );
179
180 let mut p2p_config = AuthenticatedP2pConfig::new(private_key.clone(), p2p_addr.port());
182 p2p_config.listen_addr = p2p_addr;
183 p2p_config.external_addr = external_addr;
184 p2p_config.bootstrappers = bootstrappers.clone();
185 p2p_config.mailbox_size = mailbox_size;
186 p2p_config.message_backlog = message_backlog;
187 p2p_config.local = local;
188
189 let (network, channels, network_handle) =
191 AuthenticatedNetwork::new(context.clone(), p2p_config, participants.clone()).await;
192
193 info!(
194 public_key = hex::encode(network.public_key.as_ref()),
195 "P2P network initialized"
196 );
197
198 let simplex_config = SimplexConfig::new(
200 network.oracle.clone(),
201 public_key.clone(),
202 private_key,
203 participants,
204 );
205
206 let simplex_engine = SimplexEngine::new(context.clone(), simplex_config).await;
208
209 info!("Simplex BFT engine created, starting...");
210
211 let engine_handle = simplex_engine.start(
213 channels.pending,
214 channels.recovered,
215 channels.resolver,
216 channels.broadcast,
217 channels.marshal,
218 );
219
220 info!("Simplex BFT consensus running");
221
222 if let Err(e) = try_join_all(vec![network_handle, engine_handle]).await {
224 error!(?e, "Consensus engine task failed");
225 } else {
226 warn!("Consensus engine stopped unexpectedly");
227 }
228}
229
230#[cfg(test)]
231mod tests {
232 use super::*;
233
234 #[test]
235 fn test_default_config() {
236 let config = SimplexConsensusConfig::default();
237 assert!(config.local);
238 assert_eq!(config.worker_threads, 4);
239 }
240}