alto_chain/
engine.rs

1use crate::{application, indexer, indexer::Indexer, supervisor::Supervisor};
2use alto_types::{Activity, Block, Evaluation, NAMESPACE};
3use commonware_broadcast::buffered;
4use commonware_consensus::{
5    marshal,
6    threshold_simplex::{self, Engine as Consensus},
7    Reporters,
8};
9use commonware_cryptography::{
10    bls12381::primitives::{
11        group,
12        poly::{public, Poly},
13        variant::MinSig,
14    },
15    ed25519::{PrivateKey, PublicKey},
16    sha256::Digest,
17    Signer,
18};
19use commonware_p2p::{Blocker, Receiver, Sender};
20use commonware_runtime::{buffer::PoolRef, Clock, Handle, Metrics, Spawner, Storage};
21use commonware_utils::{NZUsize, NZU64};
22use futures::future::try_join_all;
23use governor::clock::Clock as GClock;
24use governor::Quota;
25use rand::{CryptoRng, Rng};
26use std::{num::NonZero, time::Duration};
27use tracing::{error, warn};
28
29/// Reporter type for [threshold_simplex::Engine].
30type Reporter<E, I> =
31    Reporters<Activity, marshal::Mailbox<MinSig, Block>, Option<indexer::Pusher<E, I>>>;
32
33/// To better support peers near tip during network instability, we multiply
34/// the consensus activity timeout by this factor.
35const SYNCER_ACTIVITY_TIMEOUT_MULTIPLIER: u64 = 10;
36const PRUNABLE_ITEMS_PER_SECTION: NonZero<u64> = NZU64!(4_096);
37const IMMUTABLE_ITEMS_PER_SECTION: NonZero<u64> = NZU64!(262_144);
38const FREEZER_TABLE_RESIZE_FREQUENCY: u8 = 4;
39const FREEZER_TABLE_RESIZE_CHUNK_SIZE: u32 = 2u32.pow(16); // 3MB
40const FREEZER_JOURNAL_TARGET_SIZE: u64 = 1024 * 1024 * 1024; // 1GB
41const FREEZER_JOURNAL_COMPRESSION: Option<u8> = Some(3);
42const REPLAY_BUFFER: NonZero<usize> = NZUsize!(8 * 1024 * 1024); // 8MB
43const WRITE_BUFFER: NonZero<usize> = NZUsize!(1024 * 1024); // 1MB
44const BUFFER_POOL_PAGE_SIZE: NonZero<usize> = NZUsize!(4_096); // 4KB
45const BUFFER_POOL_CAPACITY: NonZero<usize> = NZUsize!(8_192); // 32MB
46const MAX_REPAIR: u64 = 20;
47
48/// Configuration for the [Engine].
49pub struct Config<B: Blocker<PublicKey = PublicKey>, I: Indexer> {
50    pub blocker: B,
51    pub partition_prefix: String,
52    pub blocks_freezer_table_initial_size: u32,
53    pub finalized_freezer_table_initial_size: u32,
54    pub signer: PrivateKey,
55    pub polynomial: Poly<Evaluation>,
56    pub share: group::Share,
57    pub participants: Vec<PublicKey>,
58    pub mailbox_size: usize,
59    pub backfill_quota: Quota,
60    pub deque_size: usize,
61
62    pub leader_timeout: Duration,
63    pub notarization_timeout: Duration,
64    pub nullify_retry: Duration,
65    pub fetch_timeout: Duration,
66    pub activity_timeout: u64,
67    pub skip_timeout: u64,
68    pub max_fetch_count: usize,
69    pub max_fetch_size: usize,
70    pub fetch_concurrent: usize,
71    pub fetch_rate_per_peer: Quota,
72
73    pub indexer: Option<I>,
74}
75
76/// The engine that drives the [application].
77pub struct Engine<
78    E: Clock + GClock + Rng + CryptoRng + Spawner + Storage + Metrics,
79    B: Blocker<PublicKey = PublicKey>,
80    I: Indexer,
81> {
82    context: E,
83
84    application: application::Actor<E>,
85    application_mailbox: application::Mailbox,
86    buffer: buffered::Engine<E, PublicKey, Block>,
87    buffer_mailbox: buffered::Mailbox<PublicKey, Block>,
88    marshal: marshal::Actor<Block, E, MinSig, PublicKey, Supervisor>,
89    marshal_mailbox: marshal::Mailbox<MinSig, Block>,
90
91    consensus: Consensus<
92        E,
93        PrivateKey,
94        B,
95        MinSig,
96        Digest,
97        application::Mailbox,
98        application::Mailbox,
99        Reporter<E, I>,
100        Supervisor,
101    >,
102}
103
104impl<
105        E: Clock + GClock + Rng + CryptoRng + Spawner + Storage + Metrics,
106        B: Blocker<PublicKey = PublicKey>,
107        I: Indexer,
108    > Engine<E, B, I>
109{
110    /// Create a new [Engine].
111    pub async fn new(context: E, cfg: Config<B, I>) -> Self {
112        // Create the application
113        let identity = *public::<MinSig>(&cfg.polynomial);
114        let (application, supervisor, application_mailbox) = application::Actor::new(
115            context.with_label("application"),
116            application::Config {
117                participants: cfg.participants.clone(),
118                polynomial: cfg.polynomial,
119                share: cfg.share,
120                mailbox_size: cfg.mailbox_size,
121            },
122        );
123
124        // Create the buffer
125        let (buffer, buffer_mailbox) = buffered::Engine::new(
126            context.with_label("buffer"),
127            buffered::Config {
128                public_key: cfg.signer.public_key(),
129                mailbox_size: cfg.mailbox_size,
130                deque_size: cfg.deque_size,
131                priority: true,
132                codec_config: (),
133            },
134        );
135
136        // Create the buffer pool
137        let buffer_pool = PoolRef::new(BUFFER_POOL_PAGE_SIZE, BUFFER_POOL_CAPACITY);
138
139        // Create marshal
140        let (marshal, marshal_mailbox): (_, marshal::Mailbox<MinSig, Block>) =
141            marshal::Actor::init(
142                context.with_label("marshal"),
143                marshal::Config {
144                    public_key: cfg.signer.public_key(),
145                    identity,
146                    coordinator: supervisor.clone(),
147                    partition_prefix: cfg.partition_prefix.clone(),
148                    mailbox_size: cfg.mailbox_size,
149                    backfill_quota: cfg.backfill_quota,
150                    view_retention_timeout: cfg
151                        .activity_timeout
152                        .saturating_mul(SYNCER_ACTIVITY_TIMEOUT_MULTIPLIER),
153                    namespace: NAMESPACE.to_vec(),
154                    prunable_items_per_section: PRUNABLE_ITEMS_PER_SECTION,
155                    immutable_items_per_section: IMMUTABLE_ITEMS_PER_SECTION,
156                    freezer_table_initial_size: cfg.blocks_freezer_table_initial_size,
157                    freezer_table_resize_frequency: FREEZER_TABLE_RESIZE_FREQUENCY,
158                    freezer_table_resize_chunk_size: FREEZER_TABLE_RESIZE_CHUNK_SIZE,
159                    freezer_journal_target_size: FREEZER_JOURNAL_TARGET_SIZE,
160                    freezer_journal_compression: FREEZER_JOURNAL_COMPRESSION,
161                    freezer_journal_buffer_pool: buffer_pool.clone(),
162                    replay_buffer: REPLAY_BUFFER,
163                    write_buffer: WRITE_BUFFER,
164                    codec_config: (),
165                    max_repair: MAX_REPAIR,
166                },
167            )
168            .await;
169
170        // Create the reporter
171        let reporter = (
172            marshal_mailbox.clone(),
173            cfg.indexer.map(|indexer| {
174                indexer::Pusher::new(
175                    context.with_label("indexer"),
176                    indexer,
177                    marshal_mailbox.clone(),
178                )
179            }),
180        )
181            .into();
182
183        // Create the consensus engine
184        let consensus = Consensus::new(
185            context.with_label("consensus"),
186            threshold_simplex::Config {
187                namespace: NAMESPACE.to_vec(),
188                crypto: cfg.signer,
189                automaton: application_mailbox.clone(),
190                relay: application_mailbox.clone(),
191                reporter,
192                supervisor,
193                partition: format!("{}-consensus", cfg.partition_prefix),
194                compression: None,
195                mailbox_size: cfg.mailbox_size,
196                leader_timeout: cfg.leader_timeout,
197                notarization_timeout: cfg.notarization_timeout,
198                nullify_retry: cfg.nullify_retry,
199                fetch_timeout: cfg.fetch_timeout,
200                activity_timeout: cfg.activity_timeout,
201                skip_timeout: cfg.skip_timeout,
202                max_fetch_count: cfg.max_fetch_count,
203                fetch_concurrent: cfg.fetch_concurrent,
204                fetch_rate_per_peer: cfg.fetch_rate_per_peer,
205                replay_buffer: REPLAY_BUFFER,
206                write_buffer: WRITE_BUFFER,
207                blocker: cfg.blocker,
208                buffer_pool,
209            },
210        );
211
212        // Return the engine
213        Self {
214            context,
215
216            application,
217            application_mailbox,
218            buffer,
219            buffer_mailbox,
220            marshal,
221            marshal_mailbox,
222            consensus,
223        }
224    }
225
226    /// Start the [threshold_simplex::Engine].
227    #[allow(clippy::too_many_arguments)]
228    pub fn start(
229        self,
230        pending_network: (
231            impl Sender<PublicKey = PublicKey>,
232            impl Receiver<PublicKey = PublicKey>,
233        ),
234        recovered_network: (
235            impl Sender<PublicKey = PublicKey>,
236            impl Receiver<PublicKey = PublicKey>,
237        ),
238        resolver_network: (
239            impl Sender<PublicKey = PublicKey>,
240            impl Receiver<PublicKey = PublicKey>,
241        ),
242        broadcast_network: (
243            impl Sender<PublicKey = PublicKey>,
244            impl Receiver<PublicKey = PublicKey>,
245        ),
246        backfill_network: (
247            impl Sender<PublicKey = PublicKey>,
248            impl Receiver<PublicKey = PublicKey>,
249        ),
250    ) -> Handle<()> {
251        self.context.clone().spawn(|_| {
252            self.run(
253                pending_network,
254                recovered_network,
255                resolver_network,
256                broadcast_network,
257                backfill_network,
258            )
259        })
260    }
261
262    #[allow(clippy::too_many_arguments)]
263    async fn run(
264        self,
265        pending_network: (
266            impl Sender<PublicKey = PublicKey>,
267            impl Receiver<PublicKey = PublicKey>,
268        ),
269        recovered_network: (
270            impl Sender<PublicKey = PublicKey>,
271            impl Receiver<PublicKey = PublicKey>,
272        ),
273        resolver_network: (
274            impl Sender<PublicKey = PublicKey>,
275            impl Receiver<PublicKey = PublicKey>,
276        ),
277        broadcast_network: (
278            impl Sender<PublicKey = PublicKey>,
279            impl Receiver<PublicKey = PublicKey>,
280        ),
281        backfill_network: (
282            impl Sender<PublicKey = PublicKey>,
283            impl Receiver<PublicKey = PublicKey>,
284        ),
285    ) {
286        // Start the application
287        let application_handle = self.application.start(self.marshal_mailbox);
288
289        // Start the buffer
290        let buffer_handle = self.buffer.start(broadcast_network);
291
292        // Start marshal
293        let marshal_handle = self.marshal.start(
294            self.application_mailbox,
295            self.buffer_mailbox,
296            backfill_network,
297        );
298
299        // Start consensus
300        //
301        // We start the application prior to consensus to ensure we can handle enqueued events from consensus (otherwise
302        // restart could block).
303        let consensus_handle =
304            self.consensus
305                .start(pending_network, recovered_network, resolver_network);
306
307        // Wait for any actor to finish
308        if let Err(e) = try_join_all(vec![
309            application_handle,
310            buffer_handle,
311            marshal_handle,
312            consensus_handle,
313        ])
314        .await
315        {
316            error!(?e, "engine failed");
317        } else {
318            warn!("engine stopped");
319        }
320    }
321}