alto_chain/
engine.rs

1use crate::{application, indexer, indexer::Indexer, StaticSchemeProvider};
2use alto_types::{Activity, Block, Evaluation, Scheme, EPOCH, EPOCH_LENGTH, NAMESPACE};
3use commonware_broadcast::buffered;
4use commonware_consensus::{
5    marshal::{self, ingress::handler},
6    simplex::{self, Engine as Consensus},
7    Reporters,
8};
9use commonware_cryptography::{
10    bls12381::primitives::{group, poly::Poly},
11    ed25519::PublicKey,
12    sha256::Digest,
13};
14use commonware_p2p::{Blocker, Receiver, Sender};
15use commonware_resolver::Resolver;
16use commonware_runtime::{
17    buffer::PoolRef, spawn_cell, Clock, ContextCell, Handle, Metrics, Spawner, Storage,
18};
19use commonware_utils::set::Ordered;
20use commonware_utils::{NZUsize, NZU64};
21use futures::{channel::mpsc, future::try_join_all};
22use governor::clock::Clock as GClock;
23use governor::Quota;
24use rand::{CryptoRng, Rng};
25use std::marker::PhantomData;
26use std::{num::NonZero, time::Duration};
27use tracing::{error, warn};
28
29/// Reporter type for [simplex::Engine].
30type Reporter<E, I> =
31    Reporters<Activity, marshal::Mailbox<Scheme, Block>, Option<indexer::Pusher<E, I>>>;
32
33/// To better support peers near tip during network instability, we multiply
34/// the consensus activity timeout by this factor.
35const SYNCER_ACTIVITY_TIMEOUT_MULTIPLIER: u64 = 10;
36const PRUNABLE_ITEMS_PER_SECTION: NonZero<u64> = NZU64!(4_096);
37const IMMUTABLE_ITEMS_PER_SECTION: NonZero<u64> = NZU64!(262_144);
38const FREEZER_TABLE_RESIZE_FREQUENCY: u8 = 4;
39const FREEZER_TABLE_RESIZE_CHUNK_SIZE: u32 = 2u32.pow(16); // 3MB
40const FREEZER_JOURNAL_TARGET_SIZE: u64 = 1024 * 1024 * 1024; // 1GB
41const FREEZER_JOURNAL_COMPRESSION: Option<u8> = Some(3);
42const REPLAY_BUFFER: NonZero<usize> = NZUsize!(8 * 1024 * 1024); // 8MB
43const WRITE_BUFFER: NonZero<usize> = NZUsize!(1024 * 1024); // 1MB
44const BUFFER_POOL_PAGE_SIZE: NonZero<usize> = NZUsize!(4_096); // 4KB
45const BUFFER_POOL_CAPACITY: NonZero<usize> = NZUsize!(8_192); // 32MB
46const MAX_REPAIR: u64 = 20;
47
48/// Configuration for the [Engine].
49pub struct Config<B: Blocker<PublicKey = PublicKey>, I: Indexer> {
50    pub blocker: B,
51    pub partition_prefix: String,
52    pub blocks_freezer_table_initial_size: u32,
53    pub finalized_freezer_table_initial_size: u32,
54    pub me: PublicKey,
55    pub polynomial: Poly<Evaluation>,
56    pub share: group::Share,
57    pub participants: Ordered<PublicKey>,
58    pub mailbox_size: usize,
59    pub deque_size: usize,
60
61    pub leader_timeout: Duration,
62    pub notarization_timeout: Duration,
63    pub nullify_retry: Duration,
64    pub fetch_timeout: Duration,
65    pub activity_timeout: u64,
66    pub skip_timeout: u64,
67    pub max_fetch_count: usize,
68    pub max_fetch_size: usize,
69    pub fetch_concurrent: usize,
70    pub fetch_rate_per_peer: Quota,
71
72    pub indexer: Option<I>,
73}
74
75/// The engine that drives the [application].
76pub struct Engine<
77    E: Clock + GClock + Rng + CryptoRng + Spawner + Storage + Metrics,
78    B: Blocker<PublicKey = PublicKey>,
79    I: Indexer,
80> {
81    context: ContextCell<E>,
82
83    application: application::Actor<E>,
84    application_mailbox: application::Mailbox,
85    buffer: buffered::Engine<E, PublicKey, Block>,
86    buffer_mailbox: buffered::Mailbox<PublicKey, Block>,
87    marshal: marshal::Actor<E, Block, StaticSchemeProvider, Scheme>,
88    marshal_mailbox: marshal::Mailbox<Scheme, Block>,
89
90    consensus: Consensus<
91        E,
92        PublicKey,
93        Scheme,
94        B,
95        Digest,
96        application::Mailbox,
97        application::Mailbox,
98        Reporter<E, I>,
99    >,
100}
101
102impl<
103        E: Clock + GClock + Rng + CryptoRng + Spawner + Storage + Metrics,
104        B: Blocker<PublicKey = PublicKey>,
105        I: Indexer,
106    > Engine<E, B, I>
107{
108    /// Create a new [Engine].
109    pub async fn new(context: E, cfg: Config<B, I>) -> Self {
110        // Create the application
111        let (application, application_mailbox) = application::Actor::new(
112            context.with_label("application"),
113            application::Config {
114                mailbox_size: cfg.mailbox_size,
115            },
116        );
117
118        // Create the buffer
119        let (buffer, buffer_mailbox) = buffered::Engine::new(
120            context.with_label("buffer"),
121            buffered::Config {
122                public_key: cfg.me,
123                mailbox_size: cfg.mailbox_size,
124                deque_size: cfg.deque_size,
125                priority: true,
126                codec_config: (),
127            },
128        );
129
130        // Create the buffer pool
131        let buffer_pool = PoolRef::new(BUFFER_POOL_PAGE_SIZE, BUFFER_POOL_CAPACITY);
132
133        // Create the signing scheme
134        let scheme = Scheme::new(cfg.participants, &cfg.polynomial, cfg.share);
135
136        // Create marshal
137        let (marshal, marshal_mailbox) = marshal::Actor::init(
138            context.with_label("marshal"),
139            marshal::Config {
140                scheme_provider: scheme.clone().into(),
141                epoch_length: EPOCH_LENGTH,
142                partition_prefix: cfg.partition_prefix.clone(),
143                mailbox_size: cfg.mailbox_size,
144                view_retention_timeout: cfg
145                    .activity_timeout
146                    .saturating_mul(SYNCER_ACTIVITY_TIMEOUT_MULTIPLIER),
147                namespace: NAMESPACE.to_vec(),
148                prunable_items_per_section: PRUNABLE_ITEMS_PER_SECTION,
149                immutable_items_per_section: IMMUTABLE_ITEMS_PER_SECTION,
150                freezer_table_initial_size: cfg.blocks_freezer_table_initial_size,
151                freezer_table_resize_frequency: FREEZER_TABLE_RESIZE_FREQUENCY,
152                freezer_table_resize_chunk_size: FREEZER_TABLE_RESIZE_CHUNK_SIZE,
153                freezer_journal_target_size: FREEZER_JOURNAL_TARGET_SIZE,
154                freezer_journal_compression: FREEZER_JOURNAL_COMPRESSION,
155                freezer_journal_buffer_pool: buffer_pool.clone(),
156                replay_buffer: REPLAY_BUFFER,
157                write_buffer: WRITE_BUFFER,
158                block_codec_config: (),
159                max_repair: MAX_REPAIR,
160                _marker: PhantomData,
161            },
162        )
163        .await;
164
165        // Create the reporter
166        let reporter = (
167            marshal_mailbox.clone(),
168            cfg.indexer.map(|indexer| {
169                indexer::Pusher::new(
170                    context.with_label("indexer"),
171                    indexer,
172                    marshal_mailbox.clone(),
173                )
174            }),
175        )
176            .into();
177
178        // Create the consensus engine
179        let consensus = Consensus::new(
180            context.with_label("consensus"),
181            simplex::Config {
182                epoch: EPOCH,
183                namespace: NAMESPACE.to_vec(),
184                scheme,
185                automaton: application_mailbox.clone(),
186                relay: application_mailbox.clone(),
187                reporter,
188                partition: format!("{}-consensus", cfg.partition_prefix),
189                mailbox_size: cfg.mailbox_size,
190                leader_timeout: cfg.leader_timeout,
191                notarization_timeout: cfg.notarization_timeout,
192                nullify_retry: cfg.nullify_retry,
193                fetch_timeout: cfg.fetch_timeout,
194                activity_timeout: cfg.activity_timeout,
195                skip_timeout: cfg.skip_timeout,
196                max_fetch_count: cfg.max_fetch_count,
197                fetch_concurrent: cfg.fetch_concurrent,
198                fetch_rate_per_peer: cfg.fetch_rate_per_peer,
199                replay_buffer: REPLAY_BUFFER,
200                write_buffer: WRITE_BUFFER,
201                blocker: cfg.blocker,
202                buffer_pool,
203            },
204        );
205
206        // Return the engine
207        Self {
208            context: ContextCell::new(context),
209
210            application,
211            application_mailbox,
212            buffer,
213            buffer_mailbox,
214            marshal,
215            marshal_mailbox,
216            consensus,
217        }
218    }
219
220    /// Start the [simplex::Engine].
221    #[allow(clippy::too_many_arguments)]
222    pub fn start(
223        mut self,
224        pending: (
225            impl Sender<PublicKey = PublicKey>,
226            impl Receiver<PublicKey = PublicKey>,
227        ),
228        recovered: (
229            impl Sender<PublicKey = PublicKey>,
230            impl Receiver<PublicKey = PublicKey>,
231        ),
232        resolver: (
233            impl Sender<PublicKey = PublicKey>,
234            impl Receiver<PublicKey = PublicKey>,
235        ),
236        broadcast: (
237            impl Sender<PublicKey = PublicKey>,
238            impl Receiver<PublicKey = PublicKey>,
239        ),
240        marshal: (
241            mpsc::Receiver<handler::Message<Block>>,
242            impl Resolver<Key = handler::Request<Block>>,
243        ),
244    ) -> Handle<()> {
245        spawn_cell!(
246            self.context,
247            self.run(pending, recovered, resolver, broadcast, marshal,)
248                .await
249        )
250    }
251
252    #[allow(clippy::too_many_arguments)]
253    async fn run(
254        self,
255        pending: (
256            impl Sender<PublicKey = PublicKey>,
257            impl Receiver<PublicKey = PublicKey>,
258        ),
259        recovered: (
260            impl Sender<PublicKey = PublicKey>,
261            impl Receiver<PublicKey = PublicKey>,
262        ),
263        resolver: (
264            impl Sender<PublicKey = PublicKey>,
265            impl Receiver<PublicKey = PublicKey>,
266        ),
267        broadcast: (
268            impl Sender<PublicKey = PublicKey>,
269            impl Receiver<PublicKey = PublicKey>,
270        ),
271        marshal: (
272            mpsc::Receiver<handler::Message<Block>>,
273            impl Resolver<Key = handler::Request<Block>>,
274        ),
275    ) {
276        // Start the application
277        let application_handle = self.application.start(self.marshal_mailbox);
278
279        // Start the buffer
280        let buffer_handle = self.buffer.start(broadcast);
281
282        // Start marshal
283        let marshal_handle =
284            self.marshal
285                .start(self.application_mailbox, self.buffer_mailbox, marshal);
286
287        // Start consensus
288        //
289        // We start the application prior to consensus to ensure we can handle enqueued events from consensus (otherwise
290        // restart could block).
291        let consensus_handle = self.consensus.start(pending, recovered, resolver);
292
293        // Wait for any actor to finish
294        if let Err(e) = try_join_all(vec![
295            application_handle,
296            buffer_handle,
297            marshal_handle,
298            consensus_handle,
299        ])
300        .await
301        {
302            error!(?e, "engine failed");
303        } else {
304            warn!("engine stopped");
305        }
306    }
307}