1use crate::{application, indexer, indexer::Indexer, StaticSchemeProvider};
2use alto_types::{Activity, Block, Evaluation, Scheme, EPOCH, EPOCH_LENGTH, NAMESPACE};
3use commonware_broadcast::buffered;
4use commonware_consensus::{
5 marshal::{self, ingress::handler},
6 simplex::{self, Engine as Consensus},
7 Reporters,
8};
9use commonware_cryptography::{
10 bls12381::primitives::{group, poly::Poly},
11 ed25519::PublicKey,
12 sha256::Digest,
13};
14use commonware_p2p::{Blocker, Receiver, Sender};
15use commonware_resolver::Resolver;
16use commonware_runtime::{
17 buffer::PoolRef, spawn_cell, Clock, ContextCell, Handle, Metrics, Spawner, Storage,
18};
19use commonware_utils::set::Ordered;
20use commonware_utils::{NZUsize, NZU64};
21use futures::{channel::mpsc, future::try_join_all};
22use governor::clock::Clock as GClock;
23use governor::Quota;
24use rand::{CryptoRng, Rng};
25use std::marker::PhantomData;
26use std::{num::NonZero, time::Duration};
27use tracing::{error, warn};
28
29type Reporter<E, I> =
31 Reporters<Activity, marshal::Mailbox<Scheme, Block>, Option<indexer::Pusher<E, I>>>;
32
33const SYNCER_ACTIVITY_TIMEOUT_MULTIPLIER: u64 = 10;
36const PRUNABLE_ITEMS_PER_SECTION: NonZero<u64> = NZU64!(4_096);
37const IMMUTABLE_ITEMS_PER_SECTION: NonZero<u64> = NZU64!(262_144);
38const FREEZER_TABLE_RESIZE_FREQUENCY: u8 = 4;
39const FREEZER_TABLE_RESIZE_CHUNK_SIZE: u32 = 2u32.pow(16); const FREEZER_JOURNAL_TARGET_SIZE: u64 = 1024 * 1024 * 1024; const FREEZER_JOURNAL_COMPRESSION: Option<u8> = Some(3);
42const REPLAY_BUFFER: NonZero<usize> = NZUsize!(8 * 1024 * 1024); const WRITE_BUFFER: NonZero<usize> = NZUsize!(1024 * 1024); const BUFFER_POOL_PAGE_SIZE: NonZero<usize> = NZUsize!(4_096); const BUFFER_POOL_CAPACITY: NonZero<usize> = NZUsize!(8_192); const MAX_REPAIR: u64 = 20;
47
48pub struct Config<B: Blocker<PublicKey = PublicKey>, I: Indexer> {
50 pub blocker: B,
51 pub partition_prefix: String,
52 pub blocks_freezer_table_initial_size: u32,
53 pub finalized_freezer_table_initial_size: u32,
54 pub me: PublicKey,
55 pub polynomial: Poly<Evaluation>,
56 pub share: group::Share,
57 pub participants: Ordered<PublicKey>,
58 pub mailbox_size: usize,
59 pub deque_size: usize,
60
61 pub leader_timeout: Duration,
62 pub notarization_timeout: Duration,
63 pub nullify_retry: Duration,
64 pub fetch_timeout: Duration,
65 pub activity_timeout: u64,
66 pub skip_timeout: u64,
67 pub max_fetch_count: usize,
68 pub max_fetch_size: usize,
69 pub fetch_concurrent: usize,
70 pub fetch_rate_per_peer: Quota,
71
72 pub indexer: Option<I>,
73}
74
75pub struct Engine<
77 E: Clock + GClock + Rng + CryptoRng + Spawner + Storage + Metrics,
78 B: Blocker<PublicKey = PublicKey>,
79 I: Indexer,
80> {
81 context: ContextCell<E>,
82
83 application: application::Actor<E>,
84 application_mailbox: application::Mailbox,
85 buffer: buffered::Engine<E, PublicKey, Block>,
86 buffer_mailbox: buffered::Mailbox<PublicKey, Block>,
87 marshal: marshal::Actor<E, Block, StaticSchemeProvider, Scheme>,
88 marshal_mailbox: marshal::Mailbox<Scheme, Block>,
89
90 consensus: Consensus<
91 E,
92 PublicKey,
93 Scheme,
94 B,
95 Digest,
96 application::Mailbox,
97 application::Mailbox,
98 Reporter<E, I>,
99 >,
100}
101
102impl<
103 E: Clock + GClock + Rng + CryptoRng + Spawner + Storage + Metrics,
104 B: Blocker<PublicKey = PublicKey>,
105 I: Indexer,
106 > Engine<E, B, I>
107{
108 pub async fn new(context: E, cfg: Config<B, I>) -> Self {
110 let (application, application_mailbox) = application::Actor::new(
112 context.with_label("application"),
113 application::Config {
114 mailbox_size: cfg.mailbox_size,
115 },
116 );
117
118 let (buffer, buffer_mailbox) = buffered::Engine::new(
120 context.with_label("buffer"),
121 buffered::Config {
122 public_key: cfg.me,
123 mailbox_size: cfg.mailbox_size,
124 deque_size: cfg.deque_size,
125 priority: true,
126 codec_config: (),
127 },
128 );
129
130 let buffer_pool = PoolRef::new(BUFFER_POOL_PAGE_SIZE, BUFFER_POOL_CAPACITY);
132
133 let scheme = Scheme::new(cfg.participants, &cfg.polynomial, cfg.share);
135
136 let (marshal, marshal_mailbox) = marshal::Actor::init(
138 context.with_label("marshal"),
139 marshal::Config {
140 scheme_provider: scheme.clone().into(),
141 epoch_length: EPOCH_LENGTH,
142 partition_prefix: cfg.partition_prefix.clone(),
143 mailbox_size: cfg.mailbox_size,
144 view_retention_timeout: cfg
145 .activity_timeout
146 .saturating_mul(SYNCER_ACTIVITY_TIMEOUT_MULTIPLIER),
147 namespace: NAMESPACE.to_vec(),
148 prunable_items_per_section: PRUNABLE_ITEMS_PER_SECTION,
149 immutable_items_per_section: IMMUTABLE_ITEMS_PER_SECTION,
150 freezer_table_initial_size: cfg.blocks_freezer_table_initial_size,
151 freezer_table_resize_frequency: FREEZER_TABLE_RESIZE_FREQUENCY,
152 freezer_table_resize_chunk_size: FREEZER_TABLE_RESIZE_CHUNK_SIZE,
153 freezer_journal_target_size: FREEZER_JOURNAL_TARGET_SIZE,
154 freezer_journal_compression: FREEZER_JOURNAL_COMPRESSION,
155 freezer_journal_buffer_pool: buffer_pool.clone(),
156 replay_buffer: REPLAY_BUFFER,
157 write_buffer: WRITE_BUFFER,
158 block_codec_config: (),
159 max_repair: MAX_REPAIR,
160 _marker: PhantomData,
161 },
162 )
163 .await;
164
165 let reporter = (
167 marshal_mailbox.clone(),
168 cfg.indexer.map(|indexer| {
169 indexer::Pusher::new(
170 context.with_label("indexer"),
171 indexer,
172 marshal_mailbox.clone(),
173 )
174 }),
175 )
176 .into();
177
178 let consensus = Consensus::new(
180 context.with_label("consensus"),
181 simplex::Config {
182 epoch: EPOCH,
183 namespace: NAMESPACE.to_vec(),
184 scheme,
185 automaton: application_mailbox.clone(),
186 relay: application_mailbox.clone(),
187 reporter,
188 partition: format!("{}-consensus", cfg.partition_prefix),
189 mailbox_size: cfg.mailbox_size,
190 leader_timeout: cfg.leader_timeout,
191 notarization_timeout: cfg.notarization_timeout,
192 nullify_retry: cfg.nullify_retry,
193 fetch_timeout: cfg.fetch_timeout,
194 activity_timeout: cfg.activity_timeout,
195 skip_timeout: cfg.skip_timeout,
196 max_fetch_count: cfg.max_fetch_count,
197 fetch_concurrent: cfg.fetch_concurrent,
198 fetch_rate_per_peer: cfg.fetch_rate_per_peer,
199 replay_buffer: REPLAY_BUFFER,
200 write_buffer: WRITE_BUFFER,
201 blocker: cfg.blocker,
202 buffer_pool,
203 },
204 );
205
206 Self {
208 context: ContextCell::new(context),
209
210 application,
211 application_mailbox,
212 buffer,
213 buffer_mailbox,
214 marshal,
215 marshal_mailbox,
216 consensus,
217 }
218 }
219
220 #[allow(clippy::too_many_arguments)]
222 pub fn start(
223 mut self,
224 pending: (
225 impl Sender<PublicKey = PublicKey>,
226 impl Receiver<PublicKey = PublicKey>,
227 ),
228 recovered: (
229 impl Sender<PublicKey = PublicKey>,
230 impl Receiver<PublicKey = PublicKey>,
231 ),
232 resolver: (
233 impl Sender<PublicKey = PublicKey>,
234 impl Receiver<PublicKey = PublicKey>,
235 ),
236 broadcast: (
237 impl Sender<PublicKey = PublicKey>,
238 impl Receiver<PublicKey = PublicKey>,
239 ),
240 marshal: (
241 mpsc::Receiver<handler::Message<Block>>,
242 impl Resolver<Key = handler::Request<Block>>,
243 ),
244 ) -> Handle<()> {
245 spawn_cell!(
246 self.context,
247 self.run(pending, recovered, resolver, broadcast, marshal,)
248 .await
249 )
250 }
251
252 #[allow(clippy::too_many_arguments)]
253 async fn run(
254 self,
255 pending: (
256 impl Sender<PublicKey = PublicKey>,
257 impl Receiver<PublicKey = PublicKey>,
258 ),
259 recovered: (
260 impl Sender<PublicKey = PublicKey>,
261 impl Receiver<PublicKey = PublicKey>,
262 ),
263 resolver: (
264 impl Sender<PublicKey = PublicKey>,
265 impl Receiver<PublicKey = PublicKey>,
266 ),
267 broadcast: (
268 impl Sender<PublicKey = PublicKey>,
269 impl Receiver<PublicKey = PublicKey>,
270 ),
271 marshal: (
272 mpsc::Receiver<handler::Message<Block>>,
273 impl Resolver<Key = handler::Request<Block>>,
274 ),
275 ) {
276 let application_handle = self.application.start(self.marshal_mailbox);
278
279 let buffer_handle = self.buffer.start(broadcast);
281
282 let marshal_handle =
284 self.marshal
285 .start(self.application_mailbox, self.buffer_mailbox, marshal);
286
287 let consensus_handle = self.consensus.start(pending, recovered, resolver);
292
293 if let Err(e) = try_join_all(vec![
295 application_handle,
296 buffer_handle,
297 marshal_handle,
298 consensus_handle,
299 ])
300 .await
301 {
302 error!(?e, "engine failed");
303 } else {
304 warn!("engine stopped");
305 }
306 }
307}