alto_chain/
engine.rs

1use crate::{
2    actors::{application, syncer},
3    Indexer,
4};
5use alto_types::{Block, Evaluation, NAMESPACE};
6use commonware_broadcast::buffered;
7use commonware_consensus::threshold_simplex::{self, Engine as Consensus};
8use commonware_cryptography::{
9    bls12381::primitives::{
10        group,
11        poly::{public, Poly},
12        variant::MinSig,
13    },
14    ed25519::{PrivateKey, PublicKey},
15    sha256::Digest,
16    Signer,
17};
18use commonware_p2p::{Blocker, Receiver, Sender};
19use commonware_runtime::{Clock, Handle, Metrics, Spawner, Storage};
20use futures::future::try_join_all;
21use governor::clock::Clock as GClock;
22use governor::Quota;
23use rand::{CryptoRng, Rng};
24use std::time::Duration;
25use tracing::{error, warn};
26
27/// To better support peers near tip during network instability, we multiply
28/// the consensus activity timeout by this factor.
29const SYNCER_ACTIVITY_TIMEOUT_MULTIPLIER: u64 = 10;
30const REPLAY_BUFFER: usize = 8 * 1024 * 1024;
31const WRITE_BUFFER: usize = 1024 * 1024;
32
33pub struct Config<B: Blocker<PublicKey = PublicKey>, I: Indexer> {
34    pub blocker: B,
35    pub partition_prefix: String,
36    pub blocks_freezer_table_initial_size: u32,
37    pub finalized_freezer_table_initial_size: u32,
38    pub signer: PrivateKey,
39    pub polynomial: Poly<Evaluation>,
40    pub share: group::Share,
41    pub participants: Vec<PublicKey>,
42    pub mailbox_size: usize,
43    pub backfill_quota: Quota,
44    pub deque_size: usize,
45
46    pub leader_timeout: Duration,
47    pub notarization_timeout: Duration,
48    pub nullify_retry: Duration,
49    pub fetch_timeout: Duration,
50    pub activity_timeout: u64,
51    pub skip_timeout: u64,
52    pub max_fetch_count: usize,
53    pub max_fetch_size: usize,
54    pub fetch_concurrent: usize,
55    pub fetch_rate_per_peer: Quota,
56
57    pub indexer: Option<I>,
58}
59
60pub struct Engine<
61    E: Clock + GClock + Rng + CryptoRng + Spawner + Storage + Metrics,
62    B: Blocker<PublicKey = PublicKey>,
63    I: Indexer,
64> {
65    context: E,
66
67    application: application::Actor<E>,
68    buffer: buffered::Engine<E, PublicKey, Block>,
69    buffer_mailbox: buffered::Mailbox<PublicKey, Block>,
70    syncer: syncer::Actor<E, I>,
71    syncer_mailbox: syncer::Mailbox,
72    consensus: Consensus<
73        E,
74        PrivateKey,
75        B,
76        MinSig,
77        Digest,
78        application::Mailbox,
79        application::Mailbox,
80        syncer::Mailbox,
81        application::Supervisor,
82    >,
83}
84
85impl<
86        E: Clock + GClock + Rng + CryptoRng + Spawner + Storage + Metrics,
87        B: Blocker<PublicKey = PublicKey>,
88        I: Indexer,
89    > Engine<E, B, I>
90{
91    pub async fn new(context: E, cfg: Config<B, I>) -> Self {
92        // Create the application
93        let identity = *public::<MinSig>(&cfg.polynomial);
94        let (application, supervisor, application_mailbox) = application::Actor::new(
95            context.with_label("application"),
96            application::Config {
97                participants: cfg.participants.clone(),
98                polynomial: cfg.polynomial,
99                share: cfg.share,
100                mailbox_size: cfg.mailbox_size,
101            },
102        );
103
104        // Create the buffer
105        let (buffer, buffer_mailbox) = buffered::Engine::new(
106            context.with_label("buffer"),
107            buffered::Config {
108                public_key: cfg.signer.public_key(),
109                mailbox_size: cfg.mailbox_size,
110                deque_size: cfg.deque_size,
111                priority: true,
112                codec_config: (),
113            },
114        );
115
116        // Create the syncer
117        let (syncer, syncer_mailbox) = syncer::Actor::init(
118            context.with_label("syncer"),
119            syncer::Config {
120                partition_prefix: cfg.partition_prefix.clone(),
121                public_key: cfg.signer.public_key(),
122                identity,
123                participants: cfg.participants,
124                blocks_freezer_table_initial_size: cfg.blocks_freezer_table_initial_size,
125                finalized_freezer_table_initial_size: cfg.finalized_freezer_table_initial_size,
126                mailbox_size: cfg.mailbox_size,
127                backfill_quota: cfg.backfill_quota,
128                activity_timeout: cfg
129                    .activity_timeout
130                    .saturating_mul(SYNCER_ACTIVITY_TIMEOUT_MULTIPLIER),
131                indexer: cfg.indexer,
132            },
133        )
134        .await;
135
136        // Create the consensus engine
137        let consensus = Consensus::new(
138            context.with_label("consensus"),
139            threshold_simplex::Config {
140                namespace: NAMESPACE.to_vec(),
141                crypto: cfg.signer,
142                automaton: application_mailbox.clone(),
143                relay: application_mailbox.clone(),
144                reporter: syncer_mailbox.clone(),
145                supervisor,
146                partition: format!("{}-consensus", cfg.partition_prefix),
147                compression: None,
148                mailbox_size: cfg.mailbox_size,
149                leader_timeout: cfg.leader_timeout,
150                notarization_timeout: cfg.notarization_timeout,
151                nullify_retry: cfg.nullify_retry,
152                fetch_timeout: cfg.fetch_timeout,
153                activity_timeout: cfg.activity_timeout,
154                skip_timeout: cfg.skip_timeout,
155                max_fetch_count: cfg.max_fetch_count,
156                fetch_concurrent: cfg.fetch_concurrent,
157                fetch_rate_per_peer: cfg.fetch_rate_per_peer,
158                replay_buffer: REPLAY_BUFFER,
159                write_buffer: WRITE_BUFFER,
160                blocker: cfg.blocker,
161            },
162        );
163
164        // Return the engine
165        Self {
166            context,
167
168            application,
169            buffer,
170            buffer_mailbox,
171            syncer,
172            syncer_mailbox,
173            consensus,
174        }
175    }
176
177    /// Start the `simplex` consensus engine.
178    ///
179    /// This will also rebuild the state of the engine from provided `Journal`.
180    pub fn start(
181        self,
182        pending_network: (
183            impl Sender<PublicKey = PublicKey>,
184            impl Receiver<PublicKey = PublicKey>,
185        ),
186        recovered_network: (
187            impl Sender<PublicKey = PublicKey>,
188            impl Receiver<PublicKey = PublicKey>,
189        ),
190        resolver_network: (
191            impl Sender<PublicKey = PublicKey>,
192            impl Receiver<PublicKey = PublicKey>,
193        ),
194        broadcast_network: (
195            impl Sender<PublicKey = PublicKey>,
196            impl Receiver<PublicKey = PublicKey>,
197        ),
198        backfill_network: (
199            impl Sender<PublicKey = PublicKey>,
200            impl Receiver<PublicKey = PublicKey>,
201        ),
202    ) -> Handle<()> {
203        self.context.clone().spawn(|_| {
204            self.run(
205                pending_network,
206                recovered_network,
207                resolver_network,
208                broadcast_network,
209                backfill_network,
210            )
211        })
212    }
213
214    async fn run(
215        self,
216        pending_network: (
217            impl Sender<PublicKey = PublicKey>,
218            impl Receiver<PublicKey = PublicKey>,
219        ),
220        recovered_network: (
221            impl Sender<PublicKey = PublicKey>,
222            impl Receiver<PublicKey = PublicKey>,
223        ),
224        resolver_network: (
225            impl Sender<PublicKey = PublicKey>,
226            impl Receiver<PublicKey = PublicKey>,
227        ),
228        broadcast_network: (
229            impl Sender<PublicKey = PublicKey>,
230            impl Receiver<PublicKey = PublicKey>,
231        ),
232        backfill_network: (
233            impl Sender<PublicKey = PublicKey>,
234            impl Receiver<PublicKey = PublicKey>,
235        ),
236    ) {
237        // Start the application
238        let application_handle = self.application.start(self.syncer_mailbox);
239
240        // Start the buffer
241        let buffer_handle = self.buffer.start(broadcast_network);
242
243        // Start the syncer
244        let syncer_handle = self.syncer.start(self.buffer_mailbox, backfill_network);
245
246        // Start consensus
247        //
248        // We start the application prior to consensus to ensure we can handle enqueued events from consensus (otherwise
249        // restart could block).
250        let consensus_handle =
251            self.consensus
252                .start(pending_network, recovered_network, resolver_network);
253
254        // Wait for any actor to finish
255        if let Err(e) = try_join_all(vec![
256            application_handle,
257            buffer_handle,
258            syncer_handle,
259            consensus_handle,
260        ])
261        .await
262        {
263            error!(?e, "engine failed");
264        } else {
265            warn!("engine stopped");
266        }
267    }
268}