alto_chain/
engine.rs

1use crate::{application, indexer, indexer::Indexer, supervisor::Supervisor};
2use alto_types::{Activity, Block, Evaluation, NAMESPACE};
3use commonware_broadcast::buffered;
4use commonware_consensus::{
5    marshal,
6    threshold_simplex::{self, Engine as Consensus},
7    Reporters,
8};
9use commonware_cryptography::{
10    bls12381::primitives::{
11        group,
12        poly::{public, Poly},
13        variant::MinSig,
14    },
15    ed25519::{PrivateKey, PublicKey},
16    sha256::Digest,
17    Signer,
18};
19use commonware_p2p::{Blocker, Receiver, Sender};
20use commonware_runtime::{Clock, Handle, Metrics, Spawner, Storage};
21use futures::future::try_join_all;
22use governor::clock::Clock as GClock;
23use governor::Quota;
24use rand::{CryptoRng, Rng};
25use std::time::Duration;
26use tracing::{error, warn};
27
28/// Reporter type for [threshold_simplex::Engine].
29type Reporter<E, I> =
30    Reporters<Activity, marshal::Mailbox<MinSig, Block>, Option<indexer::Pusher<E, I>>>;
31
32/// To better support peers near tip during network instability, we multiply
33/// the consensus activity timeout by this factor.
34const SYNCER_ACTIVITY_TIMEOUT_MULTIPLIER: u64 = 10;
35const PRUNABLE_ITEMS_PER_SECTION: u64 = 4_096;
36const IMMUTABLE_ITEMS_PER_SECTION: u64 = 262_144;
37const FREEZER_TABLE_RESIZE_FREQUENCY: u8 = 4;
38const FREEZER_TABLE_RESIZE_CHUNK_SIZE: u32 = 2u32.pow(16); // 3MB
39const FREEZER_JOURNAL_TARGET_SIZE: u64 = 1024 * 1024 * 1024; // 1GB
40const FREEZER_JOURNAL_COMPRESSION: Option<u8> = Some(3);
41const REPLAY_BUFFER: usize = 8 * 1024 * 1024; // 8MB
42const WRITE_BUFFER: usize = 1024 * 1024; // 1MB
43const MAX_REPAIR: u64 = 20;
44
45/// Configuration for the [Engine].
46pub struct Config<B: Blocker<PublicKey = PublicKey>, I: Indexer> {
47    pub blocker: B,
48    pub partition_prefix: String,
49    pub blocks_freezer_table_initial_size: u32,
50    pub finalized_freezer_table_initial_size: u32,
51    pub signer: PrivateKey,
52    pub polynomial: Poly<Evaluation>,
53    pub share: group::Share,
54    pub participants: Vec<PublicKey>,
55    pub mailbox_size: usize,
56    pub backfill_quota: Quota,
57    pub deque_size: usize,
58
59    pub leader_timeout: Duration,
60    pub notarization_timeout: Duration,
61    pub nullify_retry: Duration,
62    pub fetch_timeout: Duration,
63    pub activity_timeout: u64,
64    pub skip_timeout: u64,
65    pub max_fetch_count: usize,
66    pub max_fetch_size: usize,
67    pub fetch_concurrent: usize,
68    pub fetch_rate_per_peer: Quota,
69
70    pub indexer: Option<I>,
71}
72
73/// The engine that drives the [application].
74pub struct Engine<
75    E: Clock + GClock + Rng + CryptoRng + Spawner + Storage + Metrics,
76    B: Blocker<PublicKey = PublicKey>,
77    I: Indexer,
78> {
79    context: E,
80
81    application: application::Actor<E>,
82    application_mailbox: application::Mailbox,
83    buffer: buffered::Engine<E, PublicKey, Block>,
84    buffer_mailbox: buffered::Mailbox<PublicKey, Block>,
85    marshal: marshal::Actor<Block, E, MinSig, PublicKey, Supervisor>,
86    marshal_mailbox: marshal::Mailbox<MinSig, Block>,
87
88    consensus: Consensus<
89        E,
90        PrivateKey,
91        B,
92        MinSig,
93        Digest,
94        application::Mailbox,
95        application::Mailbox,
96        Reporter<E, I>,
97        Supervisor,
98    >,
99}
100
101impl<
102        E: Clock + GClock + Rng + CryptoRng + Spawner + Storage + Metrics,
103        B: Blocker<PublicKey = PublicKey>,
104        I: Indexer,
105    > Engine<E, B, I>
106{
107    /// Create a new [Engine].
108    pub async fn new(context: E, cfg: Config<B, I>) -> Self {
109        // Create the application
110        let identity = *public::<MinSig>(&cfg.polynomial);
111        let (application, supervisor, application_mailbox) = application::Actor::new(
112            context.with_label("application"),
113            application::Config {
114                participants: cfg.participants.clone(),
115                polynomial: cfg.polynomial,
116                share: cfg.share,
117                mailbox_size: cfg.mailbox_size,
118            },
119        );
120
121        // Create the buffer
122        let (buffer, buffer_mailbox) = buffered::Engine::new(
123            context.with_label("buffer"),
124            buffered::Config {
125                public_key: cfg.signer.public_key(),
126                mailbox_size: cfg.mailbox_size,
127                deque_size: cfg.deque_size,
128                priority: true,
129                codec_config: (),
130            },
131        );
132
133        // Create marshal
134        let (marshal, marshal_mailbox): (_, marshal::Mailbox<MinSig, Block>) =
135            marshal::Actor::init(
136                context.with_label("marshal"),
137                marshal::Config {
138                    public_key: cfg.signer.public_key(),
139                    identity,
140                    coordinator: supervisor.clone(),
141                    partition_prefix: cfg.partition_prefix.clone(),
142                    mailbox_size: cfg.mailbox_size,
143                    backfill_quota: cfg.backfill_quota,
144                    view_retention_timeout: cfg
145                        .activity_timeout
146                        .saturating_mul(SYNCER_ACTIVITY_TIMEOUT_MULTIPLIER),
147                    namespace: NAMESPACE.to_vec(),
148                    prunable_items_per_section: PRUNABLE_ITEMS_PER_SECTION,
149                    immutable_items_per_section: IMMUTABLE_ITEMS_PER_SECTION,
150                    freezer_table_initial_size: cfg.blocks_freezer_table_initial_size,
151                    freezer_table_resize_frequency: FREEZER_TABLE_RESIZE_FREQUENCY,
152                    freezer_table_resize_chunk_size: FREEZER_TABLE_RESIZE_CHUNK_SIZE,
153                    freezer_journal_target_size: FREEZER_JOURNAL_TARGET_SIZE,
154                    freezer_journal_compression: FREEZER_JOURNAL_COMPRESSION,
155                    replay_buffer: REPLAY_BUFFER,
156                    write_buffer: WRITE_BUFFER,
157                    codec_config: (),
158                    max_repair: MAX_REPAIR,
159                },
160            )
161            .await;
162
163        // Create the reporter
164        let reporter = (
165            marshal_mailbox.clone(),
166            cfg.indexer.map(|indexer| {
167                indexer::Pusher::new(
168                    context.with_label("indexer"),
169                    indexer,
170                    marshal_mailbox.clone(),
171                )
172            }),
173        )
174            .into();
175
176        // Create the consensus engine
177        let consensus = Consensus::new(
178            context.with_label("consensus"),
179            threshold_simplex::Config {
180                namespace: NAMESPACE.to_vec(),
181                crypto: cfg.signer,
182                automaton: application_mailbox.clone(),
183                relay: application_mailbox.clone(),
184                reporter,
185                supervisor,
186                partition: format!("{}-consensus", cfg.partition_prefix),
187                compression: None,
188                mailbox_size: cfg.mailbox_size,
189                leader_timeout: cfg.leader_timeout,
190                notarization_timeout: cfg.notarization_timeout,
191                nullify_retry: cfg.nullify_retry,
192                fetch_timeout: cfg.fetch_timeout,
193                activity_timeout: cfg.activity_timeout,
194                skip_timeout: cfg.skip_timeout,
195                max_fetch_count: cfg.max_fetch_count,
196                fetch_concurrent: cfg.fetch_concurrent,
197                fetch_rate_per_peer: cfg.fetch_rate_per_peer,
198                replay_buffer: REPLAY_BUFFER,
199                write_buffer: WRITE_BUFFER,
200                blocker: cfg.blocker,
201            },
202        );
203
204        // Return the engine
205        Self {
206            context,
207
208            application,
209            application_mailbox,
210            buffer,
211            buffer_mailbox,
212            marshal,
213            marshal_mailbox,
214            consensus,
215        }
216    }
217
218    /// Start the [threshold_simplex::Engine].
219    #[allow(clippy::too_many_arguments)]
220    pub fn start(
221        self,
222        pending_network: (
223            impl Sender<PublicKey = PublicKey>,
224            impl Receiver<PublicKey = PublicKey>,
225        ),
226        recovered_network: (
227            impl Sender<PublicKey = PublicKey>,
228            impl Receiver<PublicKey = PublicKey>,
229        ),
230        resolver_network: (
231            impl Sender<PublicKey = PublicKey>,
232            impl Receiver<PublicKey = PublicKey>,
233        ),
234        broadcast_network: (
235            impl Sender<PublicKey = PublicKey>,
236            impl Receiver<PublicKey = PublicKey>,
237        ),
238        backfill_network: (
239            impl Sender<PublicKey = PublicKey>,
240            impl Receiver<PublicKey = PublicKey>,
241        ),
242    ) -> Handle<()> {
243        self.context.clone().spawn(|_| {
244            self.run(
245                pending_network,
246                recovered_network,
247                resolver_network,
248                broadcast_network,
249                backfill_network,
250            )
251        })
252    }
253
254    #[allow(clippy::too_many_arguments)]
255    async fn run(
256        self,
257        pending_network: (
258            impl Sender<PublicKey = PublicKey>,
259            impl Receiver<PublicKey = PublicKey>,
260        ),
261        recovered_network: (
262            impl Sender<PublicKey = PublicKey>,
263            impl Receiver<PublicKey = PublicKey>,
264        ),
265        resolver_network: (
266            impl Sender<PublicKey = PublicKey>,
267            impl Receiver<PublicKey = PublicKey>,
268        ),
269        broadcast_network: (
270            impl Sender<PublicKey = PublicKey>,
271            impl Receiver<PublicKey = PublicKey>,
272        ),
273        backfill_network: (
274            impl Sender<PublicKey = PublicKey>,
275            impl Receiver<PublicKey = PublicKey>,
276        ),
277    ) {
278        // Start the application
279        let application_handle = self.application.start(self.marshal_mailbox);
280
281        // Start the buffer
282        let buffer_handle = self.buffer.start(broadcast_network);
283
284        // Start marshal
285        let marshal_handle = self.marshal.start(
286            self.application_mailbox,
287            self.buffer_mailbox,
288            backfill_network,
289        );
290
291        // Start consensus
292        //
293        // We start the application prior to consensus to ensure we can handle enqueued events from consensus (otherwise
294        // restart could block).
295        let consensus_handle =
296            self.consensus
297                .start(pending_network, recovered_network, resolver_network);
298
299        // Wait for any actor to finish
300        if let Err(e) = try_join_all(vec![
301            application_handle,
302            buffer_handle,
303            marshal_handle,
304            consensus_handle,
305        ])
306        .await
307        {
308            error!(?e, "engine failed");
309        } else {
310            warn!("engine stopped");
311        }
312    }
313}