1use crate::actors::{application, syncer};
2use alto_types::NAMESPACE;
3use commonware_consensus::threshold_simplex::{self, Engine as Consensus, Prover};
4use commonware_cryptography::{
5 bls12381::primitives::{group, poly::public, poly::Poly},
6 ed25519::PublicKey,
7 sha256::Digest,
8 Ed25519, Scheme,
9};
10use commonware_p2p::{Receiver, Sender};
11use commonware_runtime::{Blob, Clock, Handle, Metrics, Spawner, Storage};
12use commonware_storage::journal::variable::{self, Journal};
13use futures::future::try_join_all;
14use governor::clock::Clock as GClock;
15use governor::Quota;
16use rand::{CryptoRng, Rng};
17use std::time::Duration;
18use tracing::{error, warn};
19
20pub struct Config {
21 pub partition_prefix: String,
22 pub signer: Ed25519,
23 pub identity: Poly<group::Public>,
24 pub share: group::Share,
25 pub participants: Vec<PublicKey>,
26 pub mailbox_size: usize,
27 pub backfill_quota: Quota,
28
29 pub leader_timeout: Duration,
30 pub notarization_timeout: Duration,
31 pub nullify_retry: Duration,
32 pub fetch_timeout: Duration,
33 pub activity_timeout: u64,
34 pub max_fetch_count: usize,
35 pub max_fetch_size: usize,
36 pub fetch_concurrent: usize,
37 pub fetch_rate_per_peer: Quota,
38}
39
40pub struct Engine<B: Blob, E: Clock + GClock + Rng + CryptoRng + Spawner + Storage<B> + Metrics> {
41 context: E,
42
43 application: application::Actor<E>,
44 syncer: syncer::Actor<B, E>,
45 syncer_mailbox: syncer::Mailbox,
46 consensus: Consensus<
47 B,
48 E,
49 Ed25519,
50 Digest,
51 application::Mailbox,
52 application::Mailbox,
53 application::Mailbox,
54 application::Supervisor,
55 >,
56}
57
58impl<B: Blob, E: Clock + GClock + Rng + CryptoRng + Spawner + Storage<B> + Metrics> Engine<B, E> {
59 pub async fn new(context: E, cfg: Config) -> Self {
60 let public = public(&cfg.identity);
62 let (application, supervisor, application_mailbox) = application::Actor::new(
63 context.with_label("application"),
64 application::Config {
65 prover: Prover::new(public, NAMESPACE),
66 participants: cfg.participants.clone(),
67 identity: cfg.identity.clone(),
68 share: cfg.share,
69 mailbox_size: cfg.mailbox_size,
70 },
71 );
72
73 let (syncer, syncer_mailbox) = syncer::Actor::init(
75 context.with_label("syncer"),
76 syncer::Config {
77 partition_prefix: cfg.partition_prefix.clone(),
78 public_key: cfg.signer.public_key(),
79 identity: public,
80 participants: cfg.participants,
81 mailbox_size: cfg.mailbox_size,
82 backfill_quota: cfg.backfill_quota,
83 activity_timeout: cfg.activity_timeout,
84 },
85 )
86 .await;
87
88 let journal = Journal::init(
90 context.with_label("consensus_journal"),
91 variable::Config {
92 partition: format!("{}-consensus-journal", cfg.partition_prefix),
93 },
94 )
95 .await
96 .expect("failed to create journal");
97 let consensus = Consensus::new(
98 context.with_label("consensus"),
99 journal,
100 threshold_simplex::Config {
101 namespace: NAMESPACE.to_vec(),
102 crypto: cfg.signer,
103 automaton: application_mailbox.clone(),
104 relay: application_mailbox.clone(),
105 committer: application_mailbox,
106 supervisor,
107 mailbox_size: cfg.mailbox_size,
108 replay_concurrency: 1,
109 leader_timeout: cfg.leader_timeout,
110 notarization_timeout: cfg.notarization_timeout,
111 nullify_retry: cfg.nullify_retry,
112 fetch_timeout: cfg.fetch_timeout,
113 activity_timeout: cfg.activity_timeout,
114 max_fetch_count: cfg.max_fetch_count,
115 max_fetch_size: cfg.max_fetch_size,
116 fetch_concurrent: cfg.fetch_concurrent,
117 fetch_rate_per_peer: cfg.fetch_rate_per_peer,
118 },
119 );
120
121 Self {
123 context,
124
125 application,
126 syncer,
127 syncer_mailbox,
128 consensus,
129 }
130 }
131
132 pub fn start(
136 self,
137 voter_network: (
138 impl Sender<PublicKey = PublicKey>,
139 impl Receiver<PublicKey = PublicKey>,
140 ),
141 resolver_network: (
142 impl Sender<PublicKey = PublicKey>,
143 impl Receiver<PublicKey = PublicKey>,
144 ),
145 broadcast_network: (
146 impl Sender<PublicKey = PublicKey>,
147 impl Receiver<PublicKey = PublicKey>,
148 ),
149 backfill_network: (
150 impl Sender<PublicKey = PublicKey>,
151 impl Receiver<PublicKey = PublicKey>,
152 ),
153 ) -> Handle<()> {
154 self.context.clone().spawn(|_| {
155 self.run(
156 voter_network,
157 resolver_network,
158 broadcast_network,
159 backfill_network,
160 )
161 })
162 }
163
164 async fn run(
165 self,
166 voter_network: (
167 impl Sender<PublicKey = PublicKey>,
168 impl Receiver<PublicKey = PublicKey>,
169 ),
170 resolver_network: (
171 impl Sender<PublicKey = PublicKey>,
172 impl Receiver<PublicKey = PublicKey>,
173 ),
174 broadcast_network: (
175 impl Sender<PublicKey = PublicKey>,
176 impl Receiver<PublicKey = PublicKey>,
177 ),
178 backfill_network: (
179 impl Sender<PublicKey = PublicKey>,
180 impl Receiver<PublicKey = PublicKey>,
181 ),
182 ) {
183 let application_handle = self.application.start(self.syncer_mailbox);
185
186 let syncer_handle = self.syncer.start(broadcast_network, backfill_network);
188
189 let consensus_handle = self.consensus.start(voter_network, resolver_network);
191
192 if let Err(e) =
194 try_join_all(vec![application_handle, syncer_handle, consensus_handle]).await
195 {
196 error!(?e, "engine failed");
197 } else {
198 warn!("engine stopped");
199 }
200 }
201}