1use super::{
26 application::{self, Actor as ApplicationActor, Mailbox as ApplicationMailbox},
27 block::SimplexBlock,
28 types::Scheme,
29};
30use commonware_broadcast::buffered;
31use commonware_consensus::{
32 marshal::{self, ingress::handler},
33 simplex::{self, Engine as Consensus},
34};
35use commonware_cryptography::{ed25519::PublicKey, sha256::Digest};
36use commonware_p2p::{Blocker, Receiver, Sender};
37use commonware_resolver::Resolver;
38use commonware_runtime::{buffer::PoolRef, Clock, Handle, Metrics, Spawner, Storage};
39use commonware_utils::{set::Ordered, NZUsize, NZU64};
40use futures::{channel::mpsc, future::try_join_all};
41use governor::clock::Clock as GClock;
42use governor::Quota;
43use rand::{CryptoRng, Rng};
44use std::{marker::PhantomData, num::NonZero, sync::Arc, time::Duration};
45use tracing::{error, info, warn};
46
47pub type FinalizedCallback = Arc<dyn Fn(&SimplexBlock) + Send + Sync>;
49
50pub const NAMESPACE: &[u8] = b"guts-consensus";
52
53pub const EPOCH: u64 = 0;
55
56pub const EPOCH_LENGTH: u64 = u64::MAX;
58
59const PRUNABLE_ITEMS_PER_SECTION: NonZero<u64> = NZU64!(4_096);
61const IMMUTABLE_ITEMS_PER_SECTION: NonZero<u64> = NZU64!(262_144);
62const FREEZER_TABLE_RESIZE_FREQUENCY: u8 = 4;
63const FREEZER_TABLE_RESIZE_CHUNK_SIZE: u32 = 2u32.pow(16);
64const FREEZER_JOURNAL_TARGET_SIZE: u64 = 1024 * 1024 * 1024; const FREEZER_JOURNAL_COMPRESSION: Option<u8> = Some(3);
66const REPLAY_BUFFER: NonZero<usize> = NZUsize!(8 * 1024 * 1024); const WRITE_BUFFER: NonZero<usize> = NZUsize!(1024 * 1024); const BUFFER_POOL_PAGE_SIZE: NonZero<usize> = NZUsize!(4_096); const BUFFER_POOL_CAPACITY: NonZero<usize> = NZUsize!(8_192); const MAX_REPAIR: u64 = 20;
71const SYNCER_ACTIVITY_TIMEOUT_MULTIPLIER: u64 = 10;
72
73type SimpleReporter = marshal::Mailbox<Scheme, SimplexBlock>;
77
78#[derive(Clone)]
80pub struct StaticSchemeProvider(Arc<Scheme>);
81
82impl marshal::SchemeProvider for StaticSchemeProvider {
83 type Scheme = Scheme;
84
85 fn scheme(&self, _epoch: u64) -> Option<Arc<Scheme>> {
86 Some(self.0.clone())
87 }
88}
89
90impl From<Scheme> for StaticSchemeProvider {
91 fn from(scheme: Scheme) -> Self {
92 Self(Arc::new(scheme))
93 }
94}
95
96#[derive(Clone)]
98pub struct Config<B: Blocker<PublicKey = PublicKey>> {
99 pub blocker: B,
101
102 pub partition_prefix: String,
104
105 pub blocks_freezer_table_initial_size: u32,
107
108 pub finalized_freezer_table_initial_size: u32,
110
111 pub me: PublicKey,
113
114 pub private_key: commonware_cryptography::ed25519::PrivateKey,
116
117 pub participants: Ordered<PublicKey>,
119
120 pub mailbox_size: usize,
122
123 pub deque_size: usize,
125
126 pub leader_timeout: Duration,
128
129 pub notarization_timeout: Duration,
131
132 pub nullify_retry: Duration,
134
135 pub fetch_timeout: Duration,
137
138 pub activity_timeout: u64,
140
141 pub skip_timeout: u64,
143
144 pub max_fetch_count: usize,
146
147 pub fetch_concurrent: usize,
149
150 pub fetch_rate_per_peer: Quota,
152
153 pub on_finalized: Option<FinalizedCallback>,
155}
156
157impl<B: Blocker<PublicKey = PublicKey>> Config<B> {
158 pub fn new(
160 blocker: B,
161 me: PublicKey,
162 private_key: commonware_cryptography::ed25519::PrivateKey,
163 participants: Vec<PublicKey>,
164 ) -> Self {
165 Self {
166 blocker,
167 partition_prefix: "guts".to_string(),
168 blocks_freezer_table_initial_size: 2u32.pow(21), finalized_freezer_table_initial_size: 2u32.pow(21),
170 me,
171 private_key,
172 participants: participants.into_iter().collect(),
173 mailbox_size: 1024,
174 deque_size: 10,
175 leader_timeout: Duration::from_secs(1),
176 notarization_timeout: Duration::from_secs(2),
177 nullify_retry: Duration::from_secs(10),
178 fetch_timeout: Duration::from_secs(2),
179 activity_timeout: 256,
180 skip_timeout: 32,
181 max_fetch_count: 16,
182 fetch_concurrent: 4,
183 fetch_rate_per_peer: Quota::per_second(std::num::NonZeroU32::new(128).unwrap()),
184 on_finalized: None,
185 }
186 }
187
188 pub fn on_finalized<F>(mut self, callback: F) -> Self
190 where
191 F: Fn(&SimplexBlock) + Send + Sync + 'static,
192 {
193 self.on_finalized = Some(Arc::new(callback));
194 self
195 }
196}
197
198pub struct Engine<E, B>
200where
201 E: Clock + GClock + Rng + CryptoRng + Spawner + Storage + Metrics + Clone,
202 B: Blocker<PublicKey = PublicKey>,
203{
204 context: E,
205
206 application: ApplicationActor<E>,
207 application_mailbox: ApplicationMailbox,
208 buffer: buffered::Engine<E, PublicKey, SimplexBlock>,
209 buffer_mailbox: buffered::Mailbox<PublicKey, SimplexBlock>,
210 marshal: marshal::Actor<E, SimplexBlock, StaticSchemeProvider, Scheme>,
211 marshal_mailbox: marshal::Mailbox<Scheme, SimplexBlock>,
212
213 consensus: Consensus<
214 E,
215 PublicKey,
216 Scheme,
217 B,
218 Digest,
219 ApplicationMailbox,
220 ApplicationMailbox,
221 SimpleReporter,
222 >,
223}
224
225impl<E, B> Engine<E, B>
226where
227 E: Clock + GClock + Rng + CryptoRng + Spawner + Storage + Metrics + Clone,
228 B: Blocker<PublicKey = PublicKey>,
229{
230 pub async fn new(context: E, cfg: Config<B>) -> Self {
232 let (application, application_mailbox) = ApplicationActor::new(
234 context.clone(),
235 application::Config {
236 mailbox_size: cfg.mailbox_size,
237 },
238 );
239
240 let (buffer, buffer_mailbox) = buffered::Engine::new(
242 context.clone(),
243 buffered::Config {
244 public_key: cfg.me.clone(),
245 mailbox_size: cfg.mailbox_size,
246 deque_size: cfg.deque_size,
247 priority: true,
248 codec_config: (),
249 },
250 );
251
252 let buffer_pool = PoolRef::new(BUFFER_POOL_PAGE_SIZE, BUFFER_POOL_CAPACITY);
254
255 let scheme = Scheme::new(cfg.participants.clone(), cfg.private_key);
257
258 let (marshal, marshal_mailbox) = marshal::Actor::init(
260 context.clone(),
261 marshal::Config {
262 scheme_provider: scheme.clone().into(),
263 epoch_length: EPOCH_LENGTH,
264 partition_prefix: cfg.partition_prefix.clone(),
265 mailbox_size: cfg.mailbox_size,
266 view_retention_timeout: cfg
267 .activity_timeout
268 .saturating_mul(SYNCER_ACTIVITY_TIMEOUT_MULTIPLIER),
269 namespace: NAMESPACE.to_vec(),
270 prunable_items_per_section: PRUNABLE_ITEMS_PER_SECTION,
271 immutable_items_per_section: IMMUTABLE_ITEMS_PER_SECTION,
272 freezer_table_initial_size: cfg.blocks_freezer_table_initial_size,
273 freezer_table_resize_frequency: FREEZER_TABLE_RESIZE_FREQUENCY,
274 freezer_table_resize_chunk_size: FREEZER_TABLE_RESIZE_CHUNK_SIZE,
275 freezer_journal_target_size: FREEZER_JOURNAL_TARGET_SIZE,
276 freezer_journal_compression: FREEZER_JOURNAL_COMPRESSION,
277 freezer_journal_buffer_pool: buffer_pool.clone(),
278 replay_buffer: REPLAY_BUFFER,
279 write_buffer: WRITE_BUFFER,
280 block_codec_config: (),
281 max_repair: MAX_REPAIR,
282 _marker: PhantomData,
283 },
284 )
285 .await;
286
287 let reporter = marshal_mailbox.clone();
289
290 let consensus = Consensus::new(
292 context.clone(),
293 simplex::Config {
294 epoch: EPOCH,
295 namespace: NAMESPACE.to_vec(),
296 scheme,
297 automaton: application_mailbox.clone(),
298 relay: application_mailbox.clone(),
299 reporter,
300 partition: format!("{}-consensus", cfg.partition_prefix),
301 mailbox_size: cfg.mailbox_size,
302 leader_timeout: cfg.leader_timeout,
303 notarization_timeout: cfg.notarization_timeout,
304 nullify_retry: cfg.nullify_retry,
305 fetch_timeout: cfg.fetch_timeout,
306 activity_timeout: cfg.activity_timeout,
307 skip_timeout: cfg.skip_timeout,
308 max_fetch_count: cfg.max_fetch_count,
309 fetch_concurrent: cfg.fetch_concurrent,
310 fetch_rate_per_peer: cfg.fetch_rate_per_peer,
311 replay_buffer: REPLAY_BUFFER,
312 write_buffer: WRITE_BUFFER,
313 blocker: cfg.blocker,
314 buffer_pool,
315 },
316 );
317
318 info!(
319 participants = cfg.participants.len(),
320 "created Simplex BFT consensus engine"
321 );
322
323 Self {
324 context,
325 application,
326 application_mailbox,
327 buffer,
328 buffer_mailbox,
329 marshal,
330 marshal_mailbox,
331 consensus,
332 }
333 }
334
335 #[allow(clippy::too_many_arguments)]
340 pub fn start(
341 self,
342 pending: (
343 impl Sender<PublicKey = PublicKey>,
344 impl Receiver<PublicKey = PublicKey>,
345 ),
346 recovered: (
347 impl Sender<PublicKey = PublicKey>,
348 impl Receiver<PublicKey = PublicKey>,
349 ),
350 resolver: (
351 impl Sender<PublicKey = PublicKey>,
352 impl Receiver<PublicKey = PublicKey>,
353 ),
354 broadcast: (
355 impl Sender<PublicKey = PublicKey>,
356 impl Receiver<PublicKey = PublicKey>,
357 ),
358 marshal: (
359 mpsc::Receiver<handler::Message<SimplexBlock>>,
360 impl Resolver<Key = handler::Request<SimplexBlock>>,
361 ),
362 ) -> Handle<()> {
363 let context = self.context.clone();
364 context.spawn(move |_| async move {
365 self.run(pending, recovered, resolver, broadcast, marshal)
366 .await;
367 })
368 }
369
370 #[allow(clippy::too_many_arguments)]
371 async fn run(
372 self,
373 pending: (
374 impl Sender<PublicKey = PublicKey>,
375 impl Receiver<PublicKey = PublicKey>,
376 ),
377 recovered: (
378 impl Sender<PublicKey = PublicKey>,
379 impl Receiver<PublicKey = PublicKey>,
380 ),
381 resolver: (
382 impl Sender<PublicKey = PublicKey>,
383 impl Receiver<PublicKey = PublicKey>,
384 ),
385 broadcast: (
386 impl Sender<PublicKey = PublicKey>,
387 impl Receiver<PublicKey = PublicKey>,
388 ),
389 marshal: (
390 mpsc::Receiver<handler::Message<SimplexBlock>>,
391 impl Resolver<Key = handler::Request<SimplexBlock>>,
392 ),
393 ) {
394 let application_handle = self.context.spawn({
396 let application = self.application;
397 let marshal_mailbox = self.marshal_mailbox.clone();
398 move |_| async move {
399 application.run(marshal_mailbox).await;
400 }
401 });
402
403 let buffer_handle = self.buffer.start(broadcast);
405
406 let marshal_handle =
408 self.marshal
409 .start(self.application_mailbox, self.buffer_mailbox, marshal);
410
411 let consensus_handle = self.consensus.start(pending, recovered, resolver);
413
414 if let Err(e) = try_join_all(vec![
416 application_handle,
417 buffer_handle,
418 marshal_handle,
419 consensus_handle,
420 ])
421 .await
422 {
423 error!(?e, "consensus engine failed");
424 } else {
425 warn!("consensus engine stopped");
426 }
427 }
428}
429
430#[derive(Debug, Clone, Default)]
432pub struct EngineMetrics {
433 pub view: u64,
435 pub finalized_height: u64,
437 pub pending_transactions: usize,
439 pub is_leader: bool,
441}