1use crate::{
2 actors::{application, syncer},
3 Indexer,
4};
5use alto_types::{Block, Evaluation, NAMESPACE};
6use commonware_broadcast::buffered;
7use commonware_consensus::threshold_simplex::{self, Engine as Consensus};
8use commonware_cryptography::{
9 bls12381::primitives::{
10 group,
11 poly::{public, Poly},
12 variant::MinSig,
13 },
14 ed25519::{PrivateKey, PublicKey},
15 sha256::Digest,
16 Signer,
17};
18use commonware_p2p::{Blocker, Receiver, Sender};
19use commonware_runtime::{Clock, Handle, Metrics, Spawner, Storage};
20use futures::future::try_join_all;
21use governor::clock::Clock as GClock;
22use governor::Quota;
23use rand::{CryptoRng, Rng};
24use std::time::Duration;
25use tracing::{error, warn};
26
27const SYNCER_ACTIVITY_TIMEOUT_MULTIPLIER: u64 = 10;
30const REPLAY_BUFFER: usize = 8 * 1024 * 1024;
31const WRITE_BUFFER: usize = 1024 * 1024;
32
33pub struct Config<B: Blocker<PublicKey = PublicKey>, I: Indexer> {
34 pub blocker: B,
35 pub partition_prefix: String,
36 pub blocks_freezer_table_initial_size: u32,
37 pub finalized_freezer_table_initial_size: u32,
38 pub signer: PrivateKey,
39 pub polynomial: Poly<Evaluation>,
40 pub share: group::Share,
41 pub participants: Vec<PublicKey>,
42 pub mailbox_size: usize,
43 pub backfill_quota: Quota,
44 pub deque_size: usize,
45
46 pub leader_timeout: Duration,
47 pub notarization_timeout: Duration,
48 pub nullify_retry: Duration,
49 pub fetch_timeout: Duration,
50 pub activity_timeout: u64,
51 pub skip_timeout: u64,
52 pub max_fetch_count: usize,
53 pub max_fetch_size: usize,
54 pub fetch_concurrent: usize,
55 pub fetch_rate_per_peer: Quota,
56
57 pub indexer: Option<I>,
58}
59
60pub struct Engine<
61 E: Clock + GClock + Rng + CryptoRng + Spawner + Storage + Metrics,
62 B: Blocker<PublicKey = PublicKey>,
63 I: Indexer,
64> {
65 context: E,
66
67 application: application::Actor<E>,
68 buffer: buffered::Engine<E, PublicKey, Block>,
69 buffer_mailbox: buffered::Mailbox<PublicKey, Block>,
70 syncer: syncer::Actor<E, I>,
71 syncer_mailbox: syncer::Mailbox,
72 consensus: Consensus<
73 E,
74 PrivateKey,
75 B,
76 MinSig,
77 Digest,
78 application::Mailbox,
79 application::Mailbox,
80 syncer::Mailbox,
81 application::Supervisor,
82 >,
83}
84
85impl<
86 E: Clock + GClock + Rng + CryptoRng + Spawner + Storage + Metrics,
87 B: Blocker<PublicKey = PublicKey>,
88 I: Indexer,
89 > Engine<E, B, I>
90{
91 pub async fn new(context: E, cfg: Config<B, I>) -> Self {
92 let identity = *public::<MinSig>(&cfg.polynomial);
94 let (application, supervisor, application_mailbox) = application::Actor::new(
95 context.with_label("application"),
96 application::Config {
97 participants: cfg.participants.clone(),
98 polynomial: cfg.polynomial,
99 share: cfg.share,
100 mailbox_size: cfg.mailbox_size,
101 },
102 );
103
104 let (buffer, buffer_mailbox) = buffered::Engine::new(
106 context.with_label("buffer"),
107 buffered::Config {
108 public_key: cfg.signer.public_key(),
109 mailbox_size: cfg.mailbox_size,
110 deque_size: cfg.deque_size,
111 priority: true,
112 codec_config: (),
113 },
114 );
115
116 let (syncer, syncer_mailbox) = syncer::Actor::init(
118 context.with_label("syncer"),
119 syncer::Config {
120 partition_prefix: cfg.partition_prefix.clone(),
121 public_key: cfg.signer.public_key(),
122 identity,
123 participants: cfg.participants,
124 blocks_freezer_table_initial_size: cfg.blocks_freezer_table_initial_size,
125 finalized_freezer_table_initial_size: cfg.finalized_freezer_table_initial_size,
126 mailbox_size: cfg.mailbox_size,
127 backfill_quota: cfg.backfill_quota,
128 activity_timeout: cfg
129 .activity_timeout
130 .saturating_mul(SYNCER_ACTIVITY_TIMEOUT_MULTIPLIER),
131 indexer: cfg.indexer,
132 },
133 )
134 .await;
135
136 let consensus = Consensus::new(
138 context.with_label("consensus"),
139 threshold_simplex::Config {
140 namespace: NAMESPACE.to_vec(),
141 crypto: cfg.signer,
142 automaton: application_mailbox.clone(),
143 relay: application_mailbox.clone(),
144 reporter: syncer_mailbox.clone(),
145 supervisor,
146 partition: format!("{}-consensus", cfg.partition_prefix),
147 compression: None,
148 mailbox_size: cfg.mailbox_size,
149 leader_timeout: cfg.leader_timeout,
150 notarization_timeout: cfg.notarization_timeout,
151 nullify_retry: cfg.nullify_retry,
152 fetch_timeout: cfg.fetch_timeout,
153 activity_timeout: cfg.activity_timeout,
154 skip_timeout: cfg.skip_timeout,
155 max_fetch_count: cfg.max_fetch_count,
156 fetch_concurrent: cfg.fetch_concurrent,
157 fetch_rate_per_peer: cfg.fetch_rate_per_peer,
158 replay_buffer: REPLAY_BUFFER,
159 write_buffer: WRITE_BUFFER,
160 blocker: cfg.blocker,
161 },
162 );
163
164 Self {
166 context,
167
168 application,
169 buffer,
170 buffer_mailbox,
171 syncer,
172 syncer_mailbox,
173 consensus,
174 }
175 }
176
177 pub fn start(
181 self,
182 pending_network: (
183 impl Sender<PublicKey = PublicKey>,
184 impl Receiver<PublicKey = PublicKey>,
185 ),
186 recovered_network: (
187 impl Sender<PublicKey = PublicKey>,
188 impl Receiver<PublicKey = PublicKey>,
189 ),
190 resolver_network: (
191 impl Sender<PublicKey = PublicKey>,
192 impl Receiver<PublicKey = PublicKey>,
193 ),
194 broadcast_network: (
195 impl Sender<PublicKey = PublicKey>,
196 impl Receiver<PublicKey = PublicKey>,
197 ),
198 backfill_network: (
199 impl Sender<PublicKey = PublicKey>,
200 impl Receiver<PublicKey = PublicKey>,
201 ),
202 ) -> Handle<()> {
203 self.context.clone().spawn(|_| {
204 self.run(
205 pending_network,
206 recovered_network,
207 resolver_network,
208 broadcast_network,
209 backfill_network,
210 )
211 })
212 }
213
214 async fn run(
215 self,
216 pending_network: (
217 impl Sender<PublicKey = PublicKey>,
218 impl Receiver<PublicKey = PublicKey>,
219 ),
220 recovered_network: (
221 impl Sender<PublicKey = PublicKey>,
222 impl Receiver<PublicKey = PublicKey>,
223 ),
224 resolver_network: (
225 impl Sender<PublicKey = PublicKey>,
226 impl Receiver<PublicKey = PublicKey>,
227 ),
228 broadcast_network: (
229 impl Sender<PublicKey = PublicKey>,
230 impl Receiver<PublicKey = PublicKey>,
231 ),
232 backfill_network: (
233 impl Sender<PublicKey = PublicKey>,
234 impl Receiver<PublicKey = PublicKey>,
235 ),
236 ) {
237 let application_handle = self.application.start(self.syncer_mailbox);
239
240 let buffer_handle = self.buffer.start(broadcast_network);
242
243 let syncer_handle = self.syncer.start(self.buffer_mailbox, backfill_network);
245
246 let consensus_handle =
251 self.consensus
252 .start(pending_network, recovered_network, resolver_network);
253
254 if let Err(e) = try_join_all(vec![
256 application_handle,
257 buffer_handle,
258 syncer_handle,
259 consensus_handle,
260 ])
261 .await
262 {
263 error!(?e, "engine failed");
264 } else {
265 warn!("engine stopped");
266 }
267 }
268}