1use crate::{application, indexer, indexer::Indexer, supervisor::Supervisor};
2use alto_types::{Activity, Block, Evaluation, NAMESPACE};
3use commonware_broadcast::buffered;
4use commonware_consensus::{
5 marshal,
6 threshold_simplex::{self, Engine as Consensus},
7 Reporters,
8};
9use commonware_cryptography::{
10 bls12381::primitives::{
11 group,
12 poly::{public, Poly},
13 variant::MinSig,
14 },
15 ed25519::{PrivateKey, PublicKey},
16 sha256::Digest,
17 Signer,
18};
19use commonware_p2p::{Blocker, Receiver, Sender};
20use commonware_runtime::{buffer::PoolRef, Clock, Handle, Metrics, Spawner, Storage};
21use commonware_utils::{NZUsize, NZU64};
22use futures::future::try_join_all;
23use governor::clock::Clock as GClock;
24use governor::Quota;
25use rand::{CryptoRng, Rng};
26use std::{num::NonZero, time::Duration};
27use tracing::{error, warn};
28
29type Reporter<E, I> =
31 Reporters<Activity, marshal::Mailbox<MinSig, Block>, Option<indexer::Pusher<E, I>>>;
32
33const SYNCER_ACTIVITY_TIMEOUT_MULTIPLIER: u64 = 10;
36const PRUNABLE_ITEMS_PER_SECTION: NonZero<u64> = NZU64!(4_096);
37const IMMUTABLE_ITEMS_PER_SECTION: NonZero<u64> = NZU64!(262_144);
38const FREEZER_TABLE_RESIZE_FREQUENCY: u8 = 4;
39const FREEZER_TABLE_RESIZE_CHUNK_SIZE: u32 = 2u32.pow(16); const FREEZER_JOURNAL_TARGET_SIZE: u64 = 1024 * 1024 * 1024; const FREEZER_JOURNAL_COMPRESSION: Option<u8> = Some(3);
42const REPLAY_BUFFER: NonZero<usize> = NZUsize!(8 * 1024 * 1024); const WRITE_BUFFER: NonZero<usize> = NZUsize!(1024 * 1024); const BUFFER_POOL_PAGE_SIZE: NonZero<usize> = NZUsize!(4_096); const BUFFER_POOL_CAPACITY: NonZero<usize> = NZUsize!(8_192); const MAX_REPAIR: u64 = 20;
47
48pub struct Config<B: Blocker<PublicKey = PublicKey>, I: Indexer> {
50 pub blocker: B,
51 pub partition_prefix: String,
52 pub blocks_freezer_table_initial_size: u32,
53 pub finalized_freezer_table_initial_size: u32,
54 pub signer: PrivateKey,
55 pub polynomial: Poly<Evaluation>,
56 pub share: group::Share,
57 pub participants: Vec<PublicKey>,
58 pub mailbox_size: usize,
59 pub backfill_quota: Quota,
60 pub deque_size: usize,
61
62 pub leader_timeout: Duration,
63 pub notarization_timeout: Duration,
64 pub nullify_retry: Duration,
65 pub fetch_timeout: Duration,
66 pub activity_timeout: u64,
67 pub skip_timeout: u64,
68 pub max_fetch_count: usize,
69 pub max_fetch_size: usize,
70 pub fetch_concurrent: usize,
71 pub fetch_rate_per_peer: Quota,
72
73 pub indexer: Option<I>,
74}
75
76pub struct Engine<
78 E: Clock + GClock + Rng + CryptoRng + Spawner + Storage + Metrics,
79 B: Blocker<PublicKey = PublicKey>,
80 I: Indexer,
81> {
82 context: E,
83
84 application: application::Actor<E>,
85 application_mailbox: application::Mailbox,
86 buffer: buffered::Engine<E, PublicKey, Block>,
87 buffer_mailbox: buffered::Mailbox<PublicKey, Block>,
88 marshal: marshal::Actor<Block, E, MinSig, PublicKey, Supervisor>,
89 marshal_mailbox: marshal::Mailbox<MinSig, Block>,
90
91 consensus: Consensus<
92 E,
93 PrivateKey,
94 B,
95 MinSig,
96 Digest,
97 application::Mailbox,
98 application::Mailbox,
99 Reporter<E, I>,
100 Supervisor,
101 >,
102}
103
104impl<
105 E: Clock + GClock + Rng + CryptoRng + Spawner + Storage + Metrics,
106 B: Blocker<PublicKey = PublicKey>,
107 I: Indexer,
108 > Engine<E, B, I>
109{
110 pub async fn new(context: E, cfg: Config<B, I>) -> Self {
112 let identity = *public::<MinSig>(&cfg.polynomial);
114 let (application, supervisor, application_mailbox) = application::Actor::new(
115 context.with_label("application"),
116 application::Config {
117 participants: cfg.participants.clone(),
118 polynomial: cfg.polynomial,
119 share: cfg.share,
120 mailbox_size: cfg.mailbox_size,
121 },
122 );
123
124 let (buffer, buffer_mailbox) = buffered::Engine::new(
126 context.with_label("buffer"),
127 buffered::Config {
128 public_key: cfg.signer.public_key(),
129 mailbox_size: cfg.mailbox_size,
130 deque_size: cfg.deque_size,
131 priority: true,
132 codec_config: (),
133 },
134 );
135
136 let buffer_pool = PoolRef::new(BUFFER_POOL_PAGE_SIZE, BUFFER_POOL_CAPACITY);
138
139 let (marshal, marshal_mailbox): (_, marshal::Mailbox<MinSig, Block>) =
141 marshal::Actor::init(
142 context.with_label("marshal"),
143 marshal::Config {
144 public_key: cfg.signer.public_key(),
145 identity,
146 coordinator: supervisor.clone(),
147 partition_prefix: cfg.partition_prefix.clone(),
148 mailbox_size: cfg.mailbox_size,
149 backfill_quota: cfg.backfill_quota,
150 view_retention_timeout: cfg
151 .activity_timeout
152 .saturating_mul(SYNCER_ACTIVITY_TIMEOUT_MULTIPLIER),
153 namespace: NAMESPACE.to_vec(),
154 prunable_items_per_section: PRUNABLE_ITEMS_PER_SECTION,
155 immutable_items_per_section: IMMUTABLE_ITEMS_PER_SECTION,
156 freezer_table_initial_size: cfg.blocks_freezer_table_initial_size,
157 freezer_table_resize_frequency: FREEZER_TABLE_RESIZE_FREQUENCY,
158 freezer_table_resize_chunk_size: FREEZER_TABLE_RESIZE_CHUNK_SIZE,
159 freezer_journal_target_size: FREEZER_JOURNAL_TARGET_SIZE,
160 freezer_journal_compression: FREEZER_JOURNAL_COMPRESSION,
161 freezer_journal_buffer_pool: buffer_pool.clone(),
162 replay_buffer: REPLAY_BUFFER,
163 write_buffer: WRITE_BUFFER,
164 codec_config: (),
165 max_repair: MAX_REPAIR,
166 },
167 )
168 .await;
169
170 let reporter = (
172 marshal_mailbox.clone(),
173 cfg.indexer.map(|indexer| {
174 indexer::Pusher::new(
175 context.with_label("indexer"),
176 indexer,
177 marshal_mailbox.clone(),
178 )
179 }),
180 )
181 .into();
182
183 let consensus = Consensus::new(
185 context.with_label("consensus"),
186 threshold_simplex::Config {
187 namespace: NAMESPACE.to_vec(),
188 crypto: cfg.signer,
189 automaton: application_mailbox.clone(),
190 relay: application_mailbox.clone(),
191 reporter,
192 supervisor,
193 partition: format!("{}-consensus", cfg.partition_prefix),
194 compression: None,
195 mailbox_size: cfg.mailbox_size,
196 leader_timeout: cfg.leader_timeout,
197 notarization_timeout: cfg.notarization_timeout,
198 nullify_retry: cfg.nullify_retry,
199 fetch_timeout: cfg.fetch_timeout,
200 activity_timeout: cfg.activity_timeout,
201 skip_timeout: cfg.skip_timeout,
202 max_fetch_count: cfg.max_fetch_count,
203 fetch_concurrent: cfg.fetch_concurrent,
204 fetch_rate_per_peer: cfg.fetch_rate_per_peer,
205 replay_buffer: REPLAY_BUFFER,
206 write_buffer: WRITE_BUFFER,
207 blocker: cfg.blocker,
208 buffer_pool,
209 },
210 );
211
212 Self {
214 context,
215
216 application,
217 application_mailbox,
218 buffer,
219 buffer_mailbox,
220 marshal,
221 marshal_mailbox,
222 consensus,
223 }
224 }
225
226 #[allow(clippy::too_many_arguments)]
228 pub fn start(
229 self,
230 pending_network: (
231 impl Sender<PublicKey = PublicKey>,
232 impl Receiver<PublicKey = PublicKey>,
233 ),
234 recovered_network: (
235 impl Sender<PublicKey = PublicKey>,
236 impl Receiver<PublicKey = PublicKey>,
237 ),
238 resolver_network: (
239 impl Sender<PublicKey = PublicKey>,
240 impl Receiver<PublicKey = PublicKey>,
241 ),
242 broadcast_network: (
243 impl Sender<PublicKey = PublicKey>,
244 impl Receiver<PublicKey = PublicKey>,
245 ),
246 backfill_network: (
247 impl Sender<PublicKey = PublicKey>,
248 impl Receiver<PublicKey = PublicKey>,
249 ),
250 ) -> Handle<()> {
251 self.context.clone().spawn(|_| {
252 self.run(
253 pending_network,
254 recovered_network,
255 resolver_network,
256 broadcast_network,
257 backfill_network,
258 )
259 })
260 }
261
262 #[allow(clippy::too_many_arguments)]
263 async fn run(
264 self,
265 pending_network: (
266 impl Sender<PublicKey = PublicKey>,
267 impl Receiver<PublicKey = PublicKey>,
268 ),
269 recovered_network: (
270 impl Sender<PublicKey = PublicKey>,
271 impl Receiver<PublicKey = PublicKey>,
272 ),
273 resolver_network: (
274 impl Sender<PublicKey = PublicKey>,
275 impl Receiver<PublicKey = PublicKey>,
276 ),
277 broadcast_network: (
278 impl Sender<PublicKey = PublicKey>,
279 impl Receiver<PublicKey = PublicKey>,
280 ),
281 backfill_network: (
282 impl Sender<PublicKey = PublicKey>,
283 impl Receiver<PublicKey = PublicKey>,
284 ),
285 ) {
286 let application_handle = self.application.start(self.marshal_mailbox);
288
289 let buffer_handle = self.buffer.start(broadcast_network);
291
292 let marshal_handle = self.marshal.start(
294 self.application_mailbox,
295 self.buffer_mailbox,
296 backfill_network,
297 );
298
299 let consensus_handle =
304 self.consensus
305 .start(pending_network, recovered_network, resolver_network);
306
307 if let Err(e) = try_join_all(vec![
309 application_handle,
310 buffer_handle,
311 marshal_handle,
312 consensus_handle,
313 ])
314 .await
315 {
316 error!(?e, "engine failed");
317 } else {
318 warn!("engine stopped");
319 }
320 }
321}