1use crate::{
2 aggregator, application,
3 indexer::Indexer,
4 seeder,
5 supervisor::{EpochSupervisor, ViewSupervisor},
6};
7use battleware_types::{Activity, Block, Evaluation, NAMESPACE};
8use commonware_broadcast::buffered;
9use commonware_consensus::{
10 aggregation, marshal,
11 threshold_simplex::{self, Engine as Consensus},
12 Reporters,
13};
14use commonware_cryptography::{
15 bls12381::primitives::{
16 group,
17 poly::{public, Poly},
18 variant::MinSig,
19 },
20 ed25519::{PrivateKey, PublicKey},
21 sha256::Digest,
22 Signer,
23};
24use commonware_p2p::{Blocker, Receiver, Sender};
25use commonware_runtime::{buffer::PoolRef, Clock, Handle, Metrics, Spawner, Storage};
26use commonware_utils::{NZDuration, NZUsize, NZU64};
27use futures::future::try_join_all;
28use governor::clock::Clock as GClock;
29use governor::Quota;
30use rand::{CryptoRng, Rng};
31use std::{
32 num::{NonZero, NonZeroUsize},
33 time::Duration,
34};
35use tracing::{error, warn};
36
37type Reporter = Reporters<Activity, marshal::Mailbox<MinSig, Block>, seeder::Mailbox>;
39
40const SYNCER_ACTIVITY_TIMEOUT_MULTIPLIER: u64 = 10;
43const PRUNABLE_ITEMS_PER_SECTION: NonZero<u64> = NZU64!(4_096);
44const IMMUTABLE_ITEMS_PER_SECTION: NonZero<u64> = NZU64!(262_144);
45const FREEZER_TABLE_RESIZE_FREQUENCY: u8 = 4;
46const FREEZER_TABLE_RESIZE_CHUNK_SIZE: u32 = 2u32.pow(16); const FREEZER_JOURNAL_TARGET_SIZE: u64 = 1024 * 1024 * 1024; const FREEZER_JOURNAL_COMPRESSION: Option<u8> = Some(3);
49const MMR_ITEMS_PER_BLOB: NonZero<u64> = NZU64!(128_000);
50const LOG_ITEMS_PER_SECTION: NonZero<u64> = NZU64!(64_000);
51const LOCATIONS_ITEMS_PER_BLOB: NonZero<u64> = NZU64!(128_000);
52const CERTIFICATES_ITEMS_PER_BLOB: NonZero<u64> = NZU64!(128_000);
53const CACHE_ITEMS_PER_BLOB: NonZero<u64> = NZU64!(256);
54const REPLAY_BUFFER: NonZero<usize> = NZUsize!(8 * 1024 * 1024); const WRITE_BUFFER: NonZero<usize> = NZUsize!(1024 * 1024); const MAX_REPAIR: u64 = 20;
57
58pub struct Config<B: Blocker<PublicKey = PublicKey>, I: Indexer> {
60 pub blocker: B,
61 pub partition_prefix: String,
62 pub blocks_freezer_table_initial_size: u32,
63 pub finalized_freezer_table_initial_size: u32,
64 pub buffer_pool_page_size: NonZeroUsize,
65 pub buffer_pool_capacity: NonZeroUsize,
66 pub signer: PrivateKey,
67 pub polynomial: Poly<Evaluation>,
68 pub share: group::Share,
69 pub participants: Vec<PublicKey>,
70 pub mailbox_size: usize,
71 pub backfill_quota: Quota,
72 pub deque_size: usize,
73
74 pub leader_timeout: Duration,
75 pub notarization_timeout: Duration,
76 pub nullify_retry: Duration,
77 pub fetch_timeout: Duration,
78 pub activity_timeout: u64,
79 pub skip_timeout: u64,
80 pub max_fetch_count: usize,
81 pub max_fetch_size: usize,
82 pub fetch_concurrent: usize,
83 pub fetch_rate_per_peer: Quota,
84
85 pub indexer: I,
86 pub execution_concurrency: usize,
87 pub max_uploads_outstanding: usize,
88}
89
90pub struct Engine<
92 E: Clock + GClock + Rng + CryptoRng + Spawner + Storage + Metrics,
93 B: Blocker<PublicKey = PublicKey>,
94 I: Indexer,
95> {
96 context: E,
97
98 application: application::Actor<E, I>,
99 application_mailbox: application::Mailbox<E>,
100 seeder: seeder::Actor<E, I>,
101 seeder_mailbox: seeder::Mailbox,
102 aggregator: aggregator::Actor<E, I>,
103 aggregator_mailbox: aggregator::Mailbox,
104 buffer: buffered::Engine<E, PublicKey, Block>,
105 buffer_mailbox: buffered::Mailbox<PublicKey, Block>,
106 marshal: marshal::Actor<Block, E, MinSig, PublicKey, ViewSupervisor>,
107 marshal_mailbox: marshal::Mailbox<MinSig, Block>,
108
109 #[allow(clippy::type_complexity)]
110 consensus: Consensus<
111 E,
112 PrivateKey,
113 B,
114 MinSig,
115 Digest,
116 application::Mailbox<E>,
117 application::Mailbox<E>,
118 Reporter,
119 ViewSupervisor,
120 >,
121 aggregation: aggregation::Engine<
122 E,
123 PublicKey,
124 MinSig,
125 Digest,
126 aggregator::Mailbox,
127 aggregator::Mailbox,
128 EpochSupervisor,
129 B,
130 EpochSupervisor,
131 >,
132}
133
134impl<
135 E: Clock + GClock + Rng + CryptoRng + Spawner + Storage + Metrics,
136 B: Blocker<PublicKey = PublicKey>,
137 I: Indexer,
138 > Engine<E, B, I>
139{
140 pub async fn new(context: E, cfg: Config<B, I>) -> Self {
142 let buffer_pool = PoolRef::new(cfg.buffer_pool_page_size, cfg.buffer_pool_capacity);
144
145 let identity = *public::<MinSig>(&cfg.polynomial);
147 let (application, view_supervisor, epoch_supervisor, application_mailbox) =
148 application::Actor::new(
149 context.with_label("application"),
150 application::Config {
151 participants: cfg.participants.clone(),
152 polynomial: cfg.polynomial.clone(),
153 share: cfg.share.clone(),
154 mailbox_size: cfg.mailbox_size,
155 partition_prefix: format!("{}-application", cfg.partition_prefix),
156 mmr_items_per_blob: MMR_ITEMS_PER_BLOB,
157 mmr_write_buffer: WRITE_BUFFER,
158 log_items_per_section: LOG_ITEMS_PER_SECTION,
159 log_write_buffer: WRITE_BUFFER,
160 locations_items_per_blob: LOCATIONS_ITEMS_PER_BLOB,
161 buffer_pool: buffer_pool.clone(),
162 indexer: cfg.indexer.clone(),
163 execution_concurrency: cfg.execution_concurrency,
164 },
165 );
166
167 let (seeder, seeder_mailbox) = seeder::Actor::new(
169 context.with_label("seeder"),
170 seeder::Config {
171 indexer: cfg.indexer.clone(),
172 identity,
173 supervisor: view_supervisor.clone(),
174 namespace: NAMESPACE.to_vec(),
175 public_key: cfg.signer.public_key(),
176 backfill_quota: cfg.backfill_quota,
177 mailbox_size: cfg.mailbox_size,
178 partition_prefix: format!("{}-seeder", cfg.partition_prefix),
179 items_per_blob: MMR_ITEMS_PER_BLOB,
180 write_buffer: WRITE_BUFFER,
181 replay_buffer: REPLAY_BUFFER,
182 max_uploads_outstanding: cfg.max_uploads_outstanding,
183 },
184 );
185
186 let (aggregator, aggregator_mailbox) = aggregator::Actor::new(
188 context.with_label("aggregator"),
189 aggregator::Config {
190 identity,
191 supervisor: view_supervisor.clone(),
192 namespace: NAMESPACE.to_vec(),
193 public_key: cfg.signer.public_key(),
194 backfill_quota: cfg.backfill_quota,
195 mailbox_size: cfg.mailbox_size,
196 partition: format!("{}-aggregator", cfg.partition_prefix),
197 buffer_pool: buffer_pool.clone(),
198 prunable_items_per_blob: CACHE_ITEMS_PER_BLOB,
199 persistent_items_per_blob: CERTIFICATES_ITEMS_PER_BLOB,
200 write_buffer: WRITE_BUFFER,
201 replay_buffer: REPLAY_BUFFER,
202 indexer: cfg.indexer.clone(),
203 max_uploads_outstanding: cfg.max_uploads_outstanding,
204 },
205 );
206
207 let (buffer, buffer_mailbox) = buffered::Engine::new(
209 context.with_label("buffer"),
210 buffered::Config {
211 public_key: cfg.signer.public_key(),
212 mailbox_size: cfg.mailbox_size,
213 deque_size: cfg.deque_size,
214 priority: true,
215 codec_config: (),
216 },
217 );
218
219 let (marshal, marshal_mailbox): (_, marshal::Mailbox<MinSig, Block>) =
221 marshal::Actor::init(
222 context.with_label("marshal"),
223 marshal::Config {
224 public_key: cfg.signer.public_key(),
225 identity,
226 coordinator: view_supervisor.clone(),
227 partition_prefix: format!("{}-marshal", cfg.partition_prefix),
228 mailbox_size: cfg.mailbox_size,
229 backfill_quota: cfg.backfill_quota,
230 view_retention_timeout: cfg
231 .activity_timeout
232 .saturating_mul(SYNCER_ACTIVITY_TIMEOUT_MULTIPLIER),
233 namespace: NAMESPACE.to_vec(),
234 prunable_items_per_section: PRUNABLE_ITEMS_PER_SECTION,
235 immutable_items_per_section: IMMUTABLE_ITEMS_PER_SECTION,
236 freezer_table_initial_size: cfg.blocks_freezer_table_initial_size,
237 freezer_table_resize_frequency: FREEZER_TABLE_RESIZE_FREQUENCY,
238 freezer_table_resize_chunk_size: FREEZER_TABLE_RESIZE_CHUNK_SIZE,
239 freezer_journal_target_size: FREEZER_JOURNAL_TARGET_SIZE,
240 freezer_journal_compression: FREEZER_JOURNAL_COMPRESSION,
241 replay_buffer: REPLAY_BUFFER,
242 write_buffer: WRITE_BUFFER,
243 freezer_journal_buffer_pool: buffer_pool.clone(),
244 codec_config: (),
245 max_repair: MAX_REPAIR,
246 },
247 )
248 .await;
249
250 let reporter = (marshal_mailbox.clone(), seeder_mailbox.clone()).into();
252
253 let consensus = Consensus::new(
255 context.with_label("consensus"),
256 threshold_simplex::Config {
257 namespace: NAMESPACE.to_vec(),
258 crypto: cfg.signer,
259 automaton: application_mailbox.clone(),
260 relay: application_mailbox.clone(),
261 reporter,
262 supervisor: view_supervisor,
263 partition: format!("{}-consensus", cfg.partition_prefix),
264 mailbox_size: cfg.mailbox_size,
265 leader_timeout: cfg.leader_timeout,
266 notarization_timeout: cfg.notarization_timeout,
267 nullify_retry: cfg.nullify_retry,
268 fetch_timeout: cfg.fetch_timeout,
269 activity_timeout: cfg.activity_timeout,
270 skip_timeout: cfg.skip_timeout,
271 max_fetch_count: cfg.max_fetch_count,
272 fetch_concurrent: cfg.fetch_concurrent,
273 fetch_rate_per_peer: cfg.fetch_rate_per_peer,
274 replay_buffer: REPLAY_BUFFER,
275 write_buffer: WRITE_BUFFER,
276 buffer_pool: buffer_pool.clone(),
277 blocker: cfg.blocker.clone(),
278 },
279 );
280
281 let aggregation = aggregation::Engine::new(
283 context.with_label("aggregation"),
284 aggregation::Config {
285 monitor: epoch_supervisor.clone(),
286 validators: epoch_supervisor,
287 automaton: aggregator_mailbox.clone(),
288 reporter: aggregator_mailbox.clone(),
289 blocker: cfg.blocker,
290 namespace: NAMESPACE.to_vec(),
291 priority_acks: false,
292 rebroadcast_timeout: NZDuration!(Duration::from_secs(10)),
293 epoch_bounds: (0, 0),
294 window: NZU64!(16),
295 activity_timeout: cfg.activity_timeout,
296 journal_partition: format!("{}-aggregation", cfg.partition_prefix),
297 journal_write_buffer: WRITE_BUFFER,
298 journal_replay_buffer: REPLAY_BUFFER,
299 journal_heights_per_section: NZU64!(16_384),
300 journal_compression: None,
301 journal_buffer_pool: buffer_pool,
302 },
303 );
304
305 Self {
307 context,
308
309 application,
310 application_mailbox,
311 seeder,
312 seeder_mailbox,
313 buffer,
314 buffer_mailbox,
315 marshal,
316 marshal_mailbox,
317 consensus,
318 aggregator,
319 aggregator_mailbox,
320 aggregation,
321 }
322 }
323
324 #[allow(clippy::too_many_arguments)]
326 pub fn start(
327 self,
328 pending_network: (
329 impl Sender<PublicKey = PublicKey>,
330 impl Receiver<PublicKey = PublicKey>,
331 ),
332 recovered_network: (
333 impl Sender<PublicKey = PublicKey>,
334 impl Receiver<PublicKey = PublicKey>,
335 ),
336 resolver_network: (
337 impl Sender<PublicKey = PublicKey>,
338 impl Receiver<PublicKey = PublicKey>,
339 ),
340 broadcast_network: (
341 impl Sender<PublicKey = PublicKey>,
342 impl Receiver<PublicKey = PublicKey>,
343 ),
344 backfill_network: (
345 impl Sender<PublicKey = PublicKey>,
346 impl Receiver<PublicKey = PublicKey>,
347 ),
348 seeder_network: (
349 impl Sender<PublicKey = PublicKey>,
350 impl Receiver<PublicKey = PublicKey>,
351 ),
352 aggregator_network: (
353 impl Sender<PublicKey = PublicKey>,
354 impl Receiver<PublicKey = PublicKey>,
355 ),
356 aggregation_network: (
357 impl Sender<PublicKey = PublicKey>,
358 impl Receiver<PublicKey = PublicKey>,
359 ),
360 ) -> Handle<()> {
361 self.context.clone().spawn(|_| {
362 self.run(
363 pending_network,
364 recovered_network,
365 resolver_network,
366 broadcast_network,
367 backfill_network,
368 seeder_network,
369 aggregator_network,
370 aggregation_network,
371 )
372 })
373 }
374
375 #[allow(clippy::too_many_arguments)]
376 async fn run(
377 self,
378 pending_network: (
379 impl Sender<PublicKey = PublicKey>,
380 impl Receiver<PublicKey = PublicKey>,
381 ),
382 recovered_network: (
383 impl Sender<PublicKey = PublicKey>,
384 impl Receiver<PublicKey = PublicKey>,
385 ),
386 resolver_network: (
387 impl Sender<PublicKey = PublicKey>,
388 impl Receiver<PublicKey = PublicKey>,
389 ),
390 broadcast_network: (
391 impl Sender<PublicKey = PublicKey>,
392 impl Receiver<PublicKey = PublicKey>,
393 ),
394 backfill_network: (
395 impl Sender<PublicKey = PublicKey>,
396 impl Receiver<PublicKey = PublicKey>,
397 ),
398 seeder_network: (
399 impl Sender<PublicKey = PublicKey>,
400 impl Receiver<PublicKey = PublicKey>,
401 ),
402 aggregator_network: (
403 impl Sender<PublicKey = PublicKey>,
404 impl Receiver<PublicKey = PublicKey>,
405 ),
406 aggregation_network: (
407 impl Sender<PublicKey = PublicKey>,
408 impl Receiver<PublicKey = PublicKey>,
409 ),
410 ) {
411 let seeder_handle = self.seeder.start(seeder_network);
417
418 let aggregation_handle = self.aggregation.start(aggregation_network);
420
421 let aggregator_handle = self.aggregator.start(aggregator_network);
423
424 let buffer_handle = self.buffer.start(broadcast_network);
426
427 let application_handle = self.application.start(
429 self.marshal_mailbox,
430 self.seeder_mailbox,
431 self.aggregator_mailbox,
432 );
433
434 let marshal_handle = self.marshal.start(
436 self.application_mailbox,
437 self.buffer_mailbox,
438 backfill_network,
439 );
440
441 let consensus_handle =
443 self.consensus
444 .start(pending_network, recovered_network, resolver_network);
445
446 if let Err(e) = try_join_all(vec![
448 seeder_handle,
449 aggregation_handle,
450 aggregator_handle,
451 buffer_handle,
452 application_handle,
453 marshal_handle,
454 consensus_handle,
455 ])
456 .await
457 {
458 error!(?e, "engine failed");
459 } else {
460 warn!("engine stopped");
461 }
462 }
463}