battleware_node/
engine.rs

1use crate::{
2    aggregator, application,
3    indexer::Indexer,
4    seeder,
5    supervisor::{EpochSupervisor, ViewSupervisor},
6};
7use battleware_types::{Activity, Block, Evaluation, NAMESPACE};
8use commonware_broadcast::buffered;
9use commonware_consensus::{
10    aggregation, marshal,
11    threshold_simplex::{self, Engine as Consensus},
12    Reporters,
13};
14use commonware_cryptography::{
15    bls12381::primitives::{
16        group,
17        poly::{public, Poly},
18        variant::MinSig,
19    },
20    ed25519::{PrivateKey, PublicKey},
21    sha256::Digest,
22    Signer,
23};
24use commonware_p2p::{Blocker, Receiver, Sender};
25use commonware_runtime::{buffer::PoolRef, Clock, Handle, Metrics, Spawner, Storage};
26use commonware_utils::{NZDuration, NZUsize, NZU64};
27use futures::future::try_join_all;
28use governor::clock::Clock as GClock;
29use governor::Quota;
30use rand::{CryptoRng, Rng};
31use std::{
32    num::{NonZero, NonZeroUsize},
33    time::Duration,
34};
35use tracing::{error, warn};
36
37/// Reporter type for [threshold_simplex::Engine].
38type Reporter = Reporters<Activity, marshal::Mailbox<MinSig, Block>, seeder::Mailbox>;
39
40/// To better support peers near tip during network instability, we multiply
41/// the consensus activity timeout by this factor.
42const SYNCER_ACTIVITY_TIMEOUT_MULTIPLIER: u64 = 10;
43const PRUNABLE_ITEMS_PER_SECTION: NonZero<u64> = NZU64!(4_096);
44const IMMUTABLE_ITEMS_PER_SECTION: NonZero<u64> = NZU64!(262_144);
45const FREEZER_TABLE_RESIZE_FREQUENCY: u8 = 4;
46const FREEZER_TABLE_RESIZE_CHUNK_SIZE: u32 = 2u32.pow(16); // 3MB
47const FREEZER_JOURNAL_TARGET_SIZE: u64 = 1024 * 1024 * 1024; // 1GB
48const FREEZER_JOURNAL_COMPRESSION: Option<u8> = Some(3);
49const MMR_ITEMS_PER_BLOB: NonZero<u64> = NZU64!(128_000);
50const LOG_ITEMS_PER_SECTION: NonZero<u64> = NZU64!(64_000);
51const LOCATIONS_ITEMS_PER_BLOB: NonZero<u64> = NZU64!(128_000);
52const CERTIFICATES_ITEMS_PER_BLOB: NonZero<u64> = NZU64!(128_000);
53const CACHE_ITEMS_PER_BLOB: NonZero<u64> = NZU64!(256);
54const REPLAY_BUFFER: NonZero<usize> = NZUsize!(8 * 1024 * 1024); // 8MB
55const WRITE_BUFFER: NonZero<usize> = NZUsize!(1024 * 1024); // 1MB
56const MAX_REPAIR: u64 = 20;
57
58/// Configuration for the [Engine].
59pub struct Config<B: Blocker<PublicKey = PublicKey>, I: Indexer> {
60    pub blocker: B,
61    pub partition_prefix: String,
62    pub blocks_freezer_table_initial_size: u32,
63    pub finalized_freezer_table_initial_size: u32,
64    pub buffer_pool_page_size: NonZeroUsize,
65    pub buffer_pool_capacity: NonZeroUsize,
66    pub signer: PrivateKey,
67    pub polynomial: Poly<Evaluation>,
68    pub share: group::Share,
69    pub participants: Vec<PublicKey>,
70    pub mailbox_size: usize,
71    pub backfill_quota: Quota,
72    pub deque_size: usize,
73
74    pub leader_timeout: Duration,
75    pub notarization_timeout: Duration,
76    pub nullify_retry: Duration,
77    pub fetch_timeout: Duration,
78    pub activity_timeout: u64,
79    pub skip_timeout: u64,
80    pub max_fetch_count: usize,
81    pub max_fetch_size: usize,
82    pub fetch_concurrent: usize,
83    pub fetch_rate_per_peer: Quota,
84
85    pub indexer: I,
86    pub execution_concurrency: usize,
87    pub max_uploads_outstanding: usize,
88}
89
90/// The engine that drives the [application].
91pub struct Engine<
92    E: Clock + GClock + Rng + CryptoRng + Spawner + Storage + Metrics,
93    B: Blocker<PublicKey = PublicKey>,
94    I: Indexer,
95> {
96    context: E,
97
98    application: application::Actor<E, I>,
99    application_mailbox: application::Mailbox<E>,
100    seeder: seeder::Actor<E, I>,
101    seeder_mailbox: seeder::Mailbox,
102    aggregator: aggregator::Actor<E, I>,
103    aggregator_mailbox: aggregator::Mailbox,
104    buffer: buffered::Engine<E, PublicKey, Block>,
105    buffer_mailbox: buffered::Mailbox<PublicKey, Block>,
106    marshal: marshal::Actor<Block, E, MinSig, PublicKey, ViewSupervisor>,
107    marshal_mailbox: marshal::Mailbox<MinSig, Block>,
108
109    #[allow(clippy::type_complexity)]
110    consensus: Consensus<
111        E,
112        PrivateKey,
113        B,
114        MinSig,
115        Digest,
116        application::Mailbox<E>,
117        application::Mailbox<E>,
118        Reporter,
119        ViewSupervisor,
120    >,
121    aggregation: aggregation::Engine<
122        E,
123        PublicKey,
124        MinSig,
125        Digest,
126        aggregator::Mailbox,
127        aggregator::Mailbox,
128        EpochSupervisor,
129        B,
130        EpochSupervisor,
131    >,
132}
133
134impl<
135        E: Clock + GClock + Rng + CryptoRng + Spawner + Storage + Metrics,
136        B: Blocker<PublicKey = PublicKey>,
137        I: Indexer,
138    > Engine<E, B, I>
139{
140    /// Create a new [Engine].
141    pub async fn new(context: E, cfg: Config<B, I>) -> Self {
142        // Create the buffer pool
143        let buffer_pool = PoolRef::new(cfg.buffer_pool_page_size, cfg.buffer_pool_capacity);
144
145        // Create the application
146        let identity = *public::<MinSig>(&cfg.polynomial);
147        let (application, view_supervisor, epoch_supervisor, application_mailbox) =
148            application::Actor::new(
149                context.with_label("application"),
150                application::Config {
151                    participants: cfg.participants.clone(),
152                    polynomial: cfg.polynomial.clone(),
153                    share: cfg.share.clone(),
154                    mailbox_size: cfg.mailbox_size,
155                    partition_prefix: format!("{}-application", cfg.partition_prefix),
156                    mmr_items_per_blob: MMR_ITEMS_PER_BLOB,
157                    mmr_write_buffer: WRITE_BUFFER,
158                    log_items_per_section: LOG_ITEMS_PER_SECTION,
159                    log_write_buffer: WRITE_BUFFER,
160                    locations_items_per_blob: LOCATIONS_ITEMS_PER_BLOB,
161                    buffer_pool: buffer_pool.clone(),
162                    indexer: cfg.indexer.clone(),
163                    execution_concurrency: cfg.execution_concurrency,
164                },
165            );
166
167        // Create the seeder
168        let (seeder, seeder_mailbox) = seeder::Actor::new(
169            context.with_label("seeder"),
170            seeder::Config {
171                indexer: cfg.indexer.clone(),
172                identity,
173                supervisor: view_supervisor.clone(),
174                namespace: NAMESPACE.to_vec(),
175                public_key: cfg.signer.public_key(),
176                backfill_quota: cfg.backfill_quota,
177                mailbox_size: cfg.mailbox_size,
178                partition_prefix: format!("{}-seeder", cfg.partition_prefix),
179                items_per_blob: MMR_ITEMS_PER_BLOB,
180                write_buffer: WRITE_BUFFER,
181                replay_buffer: REPLAY_BUFFER,
182                max_uploads_outstanding: cfg.max_uploads_outstanding,
183            },
184        );
185
186        // Create the aggregator
187        let (aggregator, aggregator_mailbox) = aggregator::Actor::new(
188            context.with_label("aggregator"),
189            aggregator::Config {
190                identity,
191                supervisor: view_supervisor.clone(),
192                namespace: NAMESPACE.to_vec(),
193                public_key: cfg.signer.public_key(),
194                backfill_quota: cfg.backfill_quota,
195                mailbox_size: cfg.mailbox_size,
196                partition: format!("{}-aggregator", cfg.partition_prefix),
197                buffer_pool: buffer_pool.clone(),
198                prunable_items_per_blob: CACHE_ITEMS_PER_BLOB,
199                persistent_items_per_blob: CERTIFICATES_ITEMS_PER_BLOB,
200                write_buffer: WRITE_BUFFER,
201                replay_buffer: REPLAY_BUFFER,
202                indexer: cfg.indexer.clone(),
203                max_uploads_outstanding: cfg.max_uploads_outstanding,
204            },
205        );
206
207        // Create the buffer
208        let (buffer, buffer_mailbox) = buffered::Engine::new(
209            context.with_label("buffer"),
210            buffered::Config {
211                public_key: cfg.signer.public_key(),
212                mailbox_size: cfg.mailbox_size,
213                deque_size: cfg.deque_size,
214                priority: true,
215                codec_config: (),
216            },
217        );
218
219        // Create marshal
220        let (marshal, marshal_mailbox): (_, marshal::Mailbox<MinSig, Block>) =
221            marshal::Actor::init(
222                context.with_label("marshal"),
223                marshal::Config {
224                    public_key: cfg.signer.public_key(),
225                    identity,
226                    coordinator: view_supervisor.clone(),
227                    partition_prefix: format!("{}-marshal", cfg.partition_prefix),
228                    mailbox_size: cfg.mailbox_size,
229                    backfill_quota: cfg.backfill_quota,
230                    view_retention_timeout: cfg
231                        .activity_timeout
232                        .saturating_mul(SYNCER_ACTIVITY_TIMEOUT_MULTIPLIER),
233                    namespace: NAMESPACE.to_vec(),
234                    prunable_items_per_section: PRUNABLE_ITEMS_PER_SECTION,
235                    immutable_items_per_section: IMMUTABLE_ITEMS_PER_SECTION,
236                    freezer_table_initial_size: cfg.blocks_freezer_table_initial_size,
237                    freezer_table_resize_frequency: FREEZER_TABLE_RESIZE_FREQUENCY,
238                    freezer_table_resize_chunk_size: FREEZER_TABLE_RESIZE_CHUNK_SIZE,
239                    freezer_journal_target_size: FREEZER_JOURNAL_TARGET_SIZE,
240                    freezer_journal_compression: FREEZER_JOURNAL_COMPRESSION,
241                    replay_buffer: REPLAY_BUFFER,
242                    write_buffer: WRITE_BUFFER,
243                    freezer_journal_buffer_pool: buffer_pool.clone(),
244                    codec_config: (),
245                    max_repair: MAX_REPAIR,
246                },
247            )
248            .await;
249
250        // Create the reporter
251        let reporter = (marshal_mailbox.clone(), seeder_mailbox.clone()).into();
252
253        // Create the consensus engine
254        let consensus = Consensus::new(
255            context.with_label("consensus"),
256            threshold_simplex::Config {
257                namespace: NAMESPACE.to_vec(),
258                crypto: cfg.signer,
259                automaton: application_mailbox.clone(),
260                relay: application_mailbox.clone(),
261                reporter,
262                supervisor: view_supervisor,
263                partition: format!("{}-consensus", cfg.partition_prefix),
264                mailbox_size: cfg.mailbox_size,
265                leader_timeout: cfg.leader_timeout,
266                notarization_timeout: cfg.notarization_timeout,
267                nullify_retry: cfg.nullify_retry,
268                fetch_timeout: cfg.fetch_timeout,
269                activity_timeout: cfg.activity_timeout,
270                skip_timeout: cfg.skip_timeout,
271                max_fetch_count: cfg.max_fetch_count,
272                fetch_concurrent: cfg.fetch_concurrent,
273                fetch_rate_per_peer: cfg.fetch_rate_per_peer,
274                replay_buffer: REPLAY_BUFFER,
275                write_buffer: WRITE_BUFFER,
276                buffer_pool: buffer_pool.clone(),
277                blocker: cfg.blocker.clone(),
278            },
279        );
280
281        // Create the aggregator
282        let aggregation = aggregation::Engine::new(
283            context.with_label("aggregation"),
284            aggregation::Config {
285                monitor: epoch_supervisor.clone(),
286                validators: epoch_supervisor,
287                automaton: aggregator_mailbox.clone(),
288                reporter: aggregator_mailbox.clone(),
289                blocker: cfg.blocker,
290                namespace: NAMESPACE.to_vec(),
291                priority_acks: false,
292                rebroadcast_timeout: NZDuration!(Duration::from_secs(10)),
293                epoch_bounds: (0, 0),
294                window: NZU64!(16),
295                activity_timeout: cfg.activity_timeout,
296                journal_partition: format!("{}-aggregation", cfg.partition_prefix),
297                journal_write_buffer: WRITE_BUFFER,
298                journal_replay_buffer: REPLAY_BUFFER,
299                journal_heights_per_section: NZU64!(16_384),
300                journal_compression: None,
301                journal_buffer_pool: buffer_pool,
302            },
303        );
304
305        // Return the engine
306        Self {
307            context,
308
309            application,
310            application_mailbox,
311            seeder,
312            seeder_mailbox,
313            buffer,
314            buffer_mailbox,
315            marshal,
316            marshal_mailbox,
317            consensus,
318            aggregator,
319            aggregator_mailbox,
320            aggregation,
321        }
322    }
323
324    /// Start the [threshold_simplex::Engine].
325    #[allow(clippy::too_many_arguments)]
326    pub fn start(
327        self,
328        pending_network: (
329            impl Sender<PublicKey = PublicKey>,
330            impl Receiver<PublicKey = PublicKey>,
331        ),
332        recovered_network: (
333            impl Sender<PublicKey = PublicKey>,
334            impl Receiver<PublicKey = PublicKey>,
335        ),
336        resolver_network: (
337            impl Sender<PublicKey = PublicKey>,
338            impl Receiver<PublicKey = PublicKey>,
339        ),
340        broadcast_network: (
341            impl Sender<PublicKey = PublicKey>,
342            impl Receiver<PublicKey = PublicKey>,
343        ),
344        backfill_network: (
345            impl Sender<PublicKey = PublicKey>,
346            impl Receiver<PublicKey = PublicKey>,
347        ),
348        seeder_network: (
349            impl Sender<PublicKey = PublicKey>,
350            impl Receiver<PublicKey = PublicKey>,
351        ),
352        aggregator_network: (
353            impl Sender<PublicKey = PublicKey>,
354            impl Receiver<PublicKey = PublicKey>,
355        ),
356        aggregation_network: (
357            impl Sender<PublicKey = PublicKey>,
358            impl Receiver<PublicKey = PublicKey>,
359        ),
360    ) -> Handle<()> {
361        self.context.clone().spawn(|_| {
362            self.run(
363                pending_network,
364                recovered_network,
365                resolver_network,
366                broadcast_network,
367                backfill_network,
368                seeder_network,
369                aggregator_network,
370                aggregation_network,
371            )
372        })
373    }
374
375    #[allow(clippy::too_many_arguments)]
376    async fn run(
377        self,
378        pending_network: (
379            impl Sender<PublicKey = PublicKey>,
380            impl Receiver<PublicKey = PublicKey>,
381        ),
382        recovered_network: (
383            impl Sender<PublicKey = PublicKey>,
384            impl Receiver<PublicKey = PublicKey>,
385        ),
386        resolver_network: (
387            impl Sender<PublicKey = PublicKey>,
388            impl Receiver<PublicKey = PublicKey>,
389        ),
390        broadcast_network: (
391            impl Sender<PublicKey = PublicKey>,
392            impl Receiver<PublicKey = PublicKey>,
393        ),
394        backfill_network: (
395            impl Sender<PublicKey = PublicKey>,
396            impl Receiver<PublicKey = PublicKey>,
397        ),
398        seeder_network: (
399            impl Sender<PublicKey = PublicKey>,
400            impl Receiver<PublicKey = PublicKey>,
401        ),
402        aggregator_network: (
403            impl Sender<PublicKey = PublicKey>,
404            impl Receiver<PublicKey = PublicKey>,
405        ),
406        aggregation_network: (
407            impl Sender<PublicKey = PublicKey>,
408            impl Receiver<PublicKey = PublicKey>,
409        ),
410    ) {
411        // If a downstream actor is started after an upstream actor (i.e. application after consensus), it is possible
412        // that restart could block (as the upstream actor may fill the downstream actor's mailbox with items during initialization,
413        // potentially blocking if not read).
414
415        // Start the seeder
416        let seeder_handle = self.seeder.start(seeder_network);
417
418        // Start aggregation
419        let aggregation_handle = self.aggregation.start(aggregation_network);
420
421        // Start the aggregator
422        let aggregator_handle = self.aggregator.start(aggregator_network);
423
424        // Start the buffer
425        let buffer_handle = self.buffer.start(broadcast_network);
426
427        // Start the application
428        let application_handle = self.application.start(
429            self.marshal_mailbox,
430            self.seeder_mailbox,
431            self.aggregator_mailbox,
432        );
433
434        // Start marshal
435        let marshal_handle = self.marshal.start(
436            self.application_mailbox,
437            self.buffer_mailbox,
438            backfill_network,
439        );
440
441        // Start consensus
442        let consensus_handle =
443            self.consensus
444                .start(pending_network, recovered_network, resolver_network);
445
446        // Wait for any actor to finish
447        if let Err(e) = try_join_all(vec![
448            seeder_handle,
449            aggregation_handle,
450            aggregator_handle,
451            buffer_handle,
452            application_handle,
453            marshal_handle,
454            consensus_handle,
455        ])
456        .await
457        {
458            error!(?e, "engine failed");
459        } else {
460            warn!("engine stopped");
461        }
462    }
463}