1use crate::{application, indexer, indexer::Indexer, supervisor::Supervisor};
2use alto_types::{Activity, Block, Evaluation, NAMESPACE};
3use commonware_broadcast::buffered;
4use commonware_consensus::{
5 marshal,
6 threshold_simplex::{self, Engine as Consensus},
7 Reporters,
8};
9use commonware_cryptography::{
10 bls12381::primitives::{
11 group,
12 poly::{public, Poly},
13 variant::MinSig,
14 },
15 ed25519::{PrivateKey, PublicKey},
16 sha256::Digest,
17 Signer,
18};
19use commonware_p2p::{Blocker, Receiver, Sender};
20use commonware_runtime::{Clock, Handle, Metrics, Spawner, Storage};
21use futures::future::try_join_all;
22use governor::clock::Clock as GClock;
23use governor::Quota;
24use rand::{CryptoRng, Rng};
25use std::time::Duration;
26use tracing::{error, warn};
27
28type Reporter<E, I> =
30 Reporters<Activity, marshal::Mailbox<MinSig, Block>, Option<indexer::Pusher<E, I>>>;
31
32const SYNCER_ACTIVITY_TIMEOUT_MULTIPLIER: u64 = 10;
35const PRUNABLE_ITEMS_PER_SECTION: u64 = 4_096;
36const IMMUTABLE_ITEMS_PER_SECTION: u64 = 262_144;
37const FREEZER_TABLE_RESIZE_FREQUENCY: u8 = 4;
38const FREEZER_TABLE_RESIZE_CHUNK_SIZE: u32 = 2u32.pow(16); const FREEZER_JOURNAL_TARGET_SIZE: u64 = 1024 * 1024 * 1024; const FREEZER_JOURNAL_COMPRESSION: Option<u8> = Some(3);
41const REPLAY_BUFFER: usize = 8 * 1024 * 1024; const WRITE_BUFFER: usize = 1024 * 1024; const MAX_REPAIR: u64 = 20;
44
45pub struct Config<B: Blocker<PublicKey = PublicKey>, I: Indexer> {
47 pub blocker: B,
48 pub partition_prefix: String,
49 pub blocks_freezer_table_initial_size: u32,
50 pub finalized_freezer_table_initial_size: u32,
51 pub signer: PrivateKey,
52 pub polynomial: Poly<Evaluation>,
53 pub share: group::Share,
54 pub participants: Vec<PublicKey>,
55 pub mailbox_size: usize,
56 pub backfill_quota: Quota,
57 pub deque_size: usize,
58
59 pub leader_timeout: Duration,
60 pub notarization_timeout: Duration,
61 pub nullify_retry: Duration,
62 pub fetch_timeout: Duration,
63 pub activity_timeout: u64,
64 pub skip_timeout: u64,
65 pub max_fetch_count: usize,
66 pub max_fetch_size: usize,
67 pub fetch_concurrent: usize,
68 pub fetch_rate_per_peer: Quota,
69
70 pub indexer: Option<I>,
71}
72
73pub struct Engine<
75 E: Clock + GClock + Rng + CryptoRng + Spawner + Storage + Metrics,
76 B: Blocker<PublicKey = PublicKey>,
77 I: Indexer,
78> {
79 context: E,
80
81 application: application::Actor<E>,
82 application_mailbox: application::Mailbox,
83 buffer: buffered::Engine<E, PublicKey, Block>,
84 buffer_mailbox: buffered::Mailbox<PublicKey, Block>,
85 marshal: marshal::Actor<Block, E, MinSig, PublicKey, Supervisor>,
86 marshal_mailbox: marshal::Mailbox<MinSig, Block>,
87
88 consensus: Consensus<
89 E,
90 PrivateKey,
91 B,
92 MinSig,
93 Digest,
94 application::Mailbox,
95 application::Mailbox,
96 Reporter<E, I>,
97 Supervisor,
98 >,
99}
100
101impl<
102 E: Clock + GClock + Rng + CryptoRng + Spawner + Storage + Metrics,
103 B: Blocker<PublicKey = PublicKey>,
104 I: Indexer,
105 > Engine<E, B, I>
106{
107 pub async fn new(context: E, cfg: Config<B, I>) -> Self {
109 let identity = *public::<MinSig>(&cfg.polynomial);
111 let (application, supervisor, application_mailbox) = application::Actor::new(
112 context.with_label("application"),
113 application::Config {
114 participants: cfg.participants.clone(),
115 polynomial: cfg.polynomial,
116 share: cfg.share,
117 mailbox_size: cfg.mailbox_size,
118 },
119 );
120
121 let (buffer, buffer_mailbox) = buffered::Engine::new(
123 context.with_label("buffer"),
124 buffered::Config {
125 public_key: cfg.signer.public_key(),
126 mailbox_size: cfg.mailbox_size,
127 deque_size: cfg.deque_size,
128 priority: true,
129 codec_config: (),
130 },
131 );
132
133 let (marshal, marshal_mailbox): (_, marshal::Mailbox<MinSig, Block>) =
135 marshal::Actor::init(
136 context.with_label("marshal"),
137 marshal::Config {
138 public_key: cfg.signer.public_key(),
139 identity,
140 coordinator: supervisor.clone(),
141 partition_prefix: cfg.partition_prefix.clone(),
142 mailbox_size: cfg.mailbox_size,
143 backfill_quota: cfg.backfill_quota,
144 view_retention_timeout: cfg
145 .activity_timeout
146 .saturating_mul(SYNCER_ACTIVITY_TIMEOUT_MULTIPLIER),
147 namespace: NAMESPACE.to_vec(),
148 prunable_items_per_section: PRUNABLE_ITEMS_PER_SECTION,
149 immutable_items_per_section: IMMUTABLE_ITEMS_PER_SECTION,
150 freezer_table_initial_size: cfg.blocks_freezer_table_initial_size,
151 freezer_table_resize_frequency: FREEZER_TABLE_RESIZE_FREQUENCY,
152 freezer_table_resize_chunk_size: FREEZER_TABLE_RESIZE_CHUNK_SIZE,
153 freezer_journal_target_size: FREEZER_JOURNAL_TARGET_SIZE,
154 freezer_journal_compression: FREEZER_JOURNAL_COMPRESSION,
155 replay_buffer: REPLAY_BUFFER,
156 write_buffer: WRITE_BUFFER,
157 codec_config: (),
158 max_repair: MAX_REPAIR,
159 },
160 )
161 .await;
162
163 let reporter = (
165 marshal_mailbox.clone(),
166 cfg.indexer.map(|indexer| {
167 indexer::Pusher::new(
168 context.with_label("indexer"),
169 indexer,
170 marshal_mailbox.clone(),
171 )
172 }),
173 )
174 .into();
175
176 let consensus = Consensus::new(
178 context.with_label("consensus"),
179 threshold_simplex::Config {
180 namespace: NAMESPACE.to_vec(),
181 crypto: cfg.signer,
182 automaton: application_mailbox.clone(),
183 relay: application_mailbox.clone(),
184 reporter,
185 supervisor,
186 partition: format!("{}-consensus", cfg.partition_prefix),
187 compression: None,
188 mailbox_size: cfg.mailbox_size,
189 leader_timeout: cfg.leader_timeout,
190 notarization_timeout: cfg.notarization_timeout,
191 nullify_retry: cfg.nullify_retry,
192 fetch_timeout: cfg.fetch_timeout,
193 activity_timeout: cfg.activity_timeout,
194 skip_timeout: cfg.skip_timeout,
195 max_fetch_count: cfg.max_fetch_count,
196 fetch_concurrent: cfg.fetch_concurrent,
197 fetch_rate_per_peer: cfg.fetch_rate_per_peer,
198 replay_buffer: REPLAY_BUFFER,
199 write_buffer: WRITE_BUFFER,
200 blocker: cfg.blocker,
201 },
202 );
203
204 Self {
206 context,
207
208 application,
209 application_mailbox,
210 buffer,
211 buffer_mailbox,
212 marshal,
213 marshal_mailbox,
214 consensus,
215 }
216 }
217
218 #[allow(clippy::too_many_arguments)]
220 pub fn start(
221 self,
222 pending_network: (
223 impl Sender<PublicKey = PublicKey>,
224 impl Receiver<PublicKey = PublicKey>,
225 ),
226 recovered_network: (
227 impl Sender<PublicKey = PublicKey>,
228 impl Receiver<PublicKey = PublicKey>,
229 ),
230 resolver_network: (
231 impl Sender<PublicKey = PublicKey>,
232 impl Receiver<PublicKey = PublicKey>,
233 ),
234 broadcast_network: (
235 impl Sender<PublicKey = PublicKey>,
236 impl Receiver<PublicKey = PublicKey>,
237 ),
238 backfill_network: (
239 impl Sender<PublicKey = PublicKey>,
240 impl Receiver<PublicKey = PublicKey>,
241 ),
242 ) -> Handle<()> {
243 self.context.clone().spawn(|_| {
244 self.run(
245 pending_network,
246 recovered_network,
247 resolver_network,
248 broadcast_network,
249 backfill_network,
250 )
251 })
252 }
253
254 #[allow(clippy::too_many_arguments)]
255 async fn run(
256 self,
257 pending_network: (
258 impl Sender<PublicKey = PublicKey>,
259 impl Receiver<PublicKey = PublicKey>,
260 ),
261 recovered_network: (
262 impl Sender<PublicKey = PublicKey>,
263 impl Receiver<PublicKey = PublicKey>,
264 ),
265 resolver_network: (
266 impl Sender<PublicKey = PublicKey>,
267 impl Receiver<PublicKey = PublicKey>,
268 ),
269 broadcast_network: (
270 impl Sender<PublicKey = PublicKey>,
271 impl Receiver<PublicKey = PublicKey>,
272 ),
273 backfill_network: (
274 impl Sender<PublicKey = PublicKey>,
275 impl Receiver<PublicKey = PublicKey>,
276 ),
277 ) {
278 let application_handle = self.application.start(self.marshal_mailbox);
280
281 let buffer_handle = self.buffer.start(broadcast_network);
283
284 let marshal_handle = self.marshal.start(
286 self.application_mailbox,
287 self.buffer_mailbox,
288 backfill_network,
289 );
290
291 let consensus_handle =
296 self.consensus
297 .start(pending_network, recovered_network, resolver_network);
298
299 if let Err(e) = try_join_all(vec![
301 application_handle,
302 buffer_handle,
303 marshal_handle,
304 consensus_handle,
305 ])
306 .await
307 {
308 error!(?e, "engine failed");
309 } else {
310 warn!("engine stopped");
311 }
312 }
313}