1use crate::{
2 actors::{application, syncer},
3 Indexer,
4};
5use alto_types::NAMESPACE;
6use commonware_consensus::threshold_simplex::{self, Engine as Consensus, Prover};
7use commonware_cryptography::{
8 bls12381::primitives::{group, poly::public, poly::Poly},
9 ed25519::PublicKey,
10 sha256::Digest,
11 Ed25519, Scheme,
12};
13use commonware_p2p::{Receiver, Sender};
14use commonware_runtime::{Blob, Clock, Handle, Metrics, Spawner, Storage};
15use commonware_storage::journal::variable::{self, Journal};
16use futures::future::try_join_all;
17use governor::clock::Clock as GClock;
18use governor::Quota;
19use rand::{CryptoRng, Rng};
20use std::time::Duration;
21use tracing::{error, warn};
22
23pub struct Config<I: Indexer> {
24 pub partition_prefix: String,
25 pub signer: Ed25519,
26 pub identity: Poly<group::Public>,
27 pub share: group::Share,
28 pub participants: Vec<PublicKey>,
29 pub mailbox_size: usize,
30 pub backfill_quota: Quota,
31
32 pub leader_timeout: Duration,
33 pub notarization_timeout: Duration,
34 pub nullify_retry: Duration,
35 pub fetch_timeout: Duration,
36 pub activity_timeout: u64,
37 pub max_fetch_count: usize,
38 pub max_fetch_size: usize,
39 pub fetch_concurrent: usize,
40 pub fetch_rate_per_peer: Quota,
41
42 pub indexer: Option<I>,
43}
44
45pub struct Engine<
46 B: Blob,
47 E: Clock + GClock + Rng + CryptoRng + Spawner + Storage<B> + Metrics,
48 I: Indexer,
49> {
50 context: E,
51
52 application: application::Actor<E>,
53 syncer: syncer::Actor<B, E, I>,
54 syncer_mailbox: syncer::Mailbox,
55 consensus: Consensus<
56 B,
57 E,
58 Ed25519,
59 Digest,
60 application::Mailbox,
61 application::Mailbox,
62 application::Mailbox,
63 application::Supervisor,
64 >,
65}
66
67impl<B: Blob, E: Clock + GClock + Rng + CryptoRng + Spawner + Storage<B> + Metrics, I: Indexer>
68 Engine<B, E, I>
69{
70 pub async fn new(context: E, cfg: Config<I>) -> Self {
71 let public = public(&cfg.identity);
73 let (application, supervisor, application_mailbox) = application::Actor::new(
74 context.with_label("application"),
75 application::Config {
76 prover: Prover::new(public, NAMESPACE),
77 participants: cfg.participants.clone(),
78 identity: cfg.identity.clone(),
79 share: cfg.share,
80 mailbox_size: cfg.mailbox_size,
81 },
82 );
83
84 let (syncer, syncer_mailbox) = syncer::Actor::init(
86 context.with_label("syncer"),
87 syncer::Config {
88 partition_prefix: cfg.partition_prefix.clone(),
89 public_key: cfg.signer.public_key(),
90 identity: public,
91 participants: cfg.participants,
92 mailbox_size: cfg.mailbox_size,
93 backfill_quota: cfg.backfill_quota,
94 activity_timeout: cfg.activity_timeout,
95 indexer: cfg.indexer,
96 },
97 )
98 .await;
99
100 let journal = Journal::init(
102 context.with_label("consensus_journal"),
103 variable::Config {
104 partition: format!("{}-consensus-journal", cfg.partition_prefix),
105 },
106 )
107 .await
108 .expect("failed to create journal");
109 let consensus = Consensus::new(
110 context.with_label("consensus"),
111 journal,
112 threshold_simplex::Config {
113 namespace: NAMESPACE.to_vec(),
114 crypto: cfg.signer,
115 automaton: application_mailbox.clone(),
116 relay: application_mailbox.clone(),
117 committer: application_mailbox,
118 supervisor,
119 mailbox_size: cfg.mailbox_size,
120 replay_concurrency: 1,
121 leader_timeout: cfg.leader_timeout,
122 notarization_timeout: cfg.notarization_timeout,
123 nullify_retry: cfg.nullify_retry,
124 fetch_timeout: cfg.fetch_timeout,
125 activity_timeout: cfg.activity_timeout,
126 max_fetch_count: cfg.max_fetch_count,
127 max_fetch_size: cfg.max_fetch_size,
128 fetch_concurrent: cfg.fetch_concurrent,
129 fetch_rate_per_peer: cfg.fetch_rate_per_peer,
130 },
131 );
132
133 Self {
135 context,
136
137 application,
138 syncer,
139 syncer_mailbox,
140 consensus,
141 }
142 }
143
144 pub fn start(
148 self,
149 voter_network: (
150 impl Sender<PublicKey = PublicKey>,
151 impl Receiver<PublicKey = PublicKey>,
152 ),
153 resolver_network: (
154 impl Sender<PublicKey = PublicKey>,
155 impl Receiver<PublicKey = PublicKey>,
156 ),
157 broadcast_network: (
158 impl Sender<PublicKey = PublicKey>,
159 impl Receiver<PublicKey = PublicKey>,
160 ),
161 backfill_network: (
162 impl Sender<PublicKey = PublicKey>,
163 impl Receiver<PublicKey = PublicKey>,
164 ),
165 ) -> Handle<()> {
166 self.context.clone().spawn(|_| {
167 self.run(
168 voter_network,
169 resolver_network,
170 broadcast_network,
171 backfill_network,
172 )
173 })
174 }
175
176 async fn run(
177 self,
178 voter_network: (
179 impl Sender<PublicKey = PublicKey>,
180 impl Receiver<PublicKey = PublicKey>,
181 ),
182 resolver_network: (
183 impl Sender<PublicKey = PublicKey>,
184 impl Receiver<PublicKey = PublicKey>,
185 ),
186 broadcast_network: (
187 impl Sender<PublicKey = PublicKey>,
188 impl Receiver<PublicKey = PublicKey>,
189 ),
190 backfill_network: (
191 impl Sender<PublicKey = PublicKey>,
192 impl Receiver<PublicKey = PublicKey>,
193 ),
194 ) {
195 let application_handle = self.application.start(self.syncer_mailbox);
197
198 let syncer_handle = self.syncer.start(broadcast_network, backfill_network);
200
201 let consensus_handle = self.consensus.start(voter_network, resolver_network);
203
204 if let Err(e) =
206 try_join_all(vec![application_handle, syncer_handle, consensus_handle]).await
207 {
208 error!(?e, "engine failed");
209 } else {
210 warn!("engine stopped");
211 }
212 }
213}