alto_chain/
engine.rs

1use crate::actors::{application, syncer};
2use alto_types::NAMESPACE;
3use commonware_consensus::threshold_simplex::{self, Engine as Consensus, Prover};
4use commonware_cryptography::{
5    bls12381::primitives::{group, poly::public, poly::Poly},
6    ed25519::PublicKey,
7    sha256::Digest,
8    Ed25519, Scheme,
9};
10use commonware_p2p::{Receiver, Sender};
11use commonware_runtime::{Blob, Clock, Handle, Metrics, Spawner, Storage};
12use commonware_storage::journal::variable::{self, Journal};
13use futures::future::try_join_all;
14use governor::clock::Clock as GClock;
15use governor::Quota;
16use rand::{CryptoRng, Rng};
17use std::time::Duration;
18use tracing::{error, warn};
19
20pub struct Config {
21    pub partition_prefix: String,
22    pub signer: Ed25519,
23    pub identity: Poly<group::Public>,
24    pub share: group::Share,
25    pub participants: Vec<PublicKey>,
26    pub mailbox_size: usize,
27    pub backfill_quota: Quota,
28
29    pub leader_timeout: Duration,
30    pub notarization_timeout: Duration,
31    pub nullify_retry: Duration,
32    pub fetch_timeout: Duration,
33    pub activity_timeout: u64,
34    pub max_fetch_count: usize,
35    pub max_fetch_size: usize,
36    pub fetch_concurrent: usize,
37    pub fetch_rate_per_peer: Quota,
38}
39
40pub struct Engine<B: Blob, E: Clock + GClock + Rng + CryptoRng + Spawner + Storage<B> + Metrics> {
41    context: E,
42
43    application: application::Actor<E>,
44    syncer: syncer::Actor<B, E>,
45    syncer_mailbox: syncer::Mailbox,
46    consensus: Consensus<
47        B,
48        E,
49        Ed25519,
50        Digest,
51        application::Mailbox,
52        application::Mailbox,
53        application::Mailbox,
54        application::Supervisor,
55    >,
56}
57
58impl<B: Blob, E: Clock + GClock + Rng + CryptoRng + Spawner + Storage<B> + Metrics> Engine<B, E> {
59    pub async fn new(context: E, cfg: Config) -> Self {
60        // Create the application
61        let public = public(&cfg.identity);
62        let (application, supervisor, application_mailbox) = application::Actor::new(
63            context.with_label("application"),
64            application::Config {
65                prover: Prover::new(public, NAMESPACE),
66                participants: cfg.participants.clone(),
67                identity: cfg.identity.clone(),
68                share: cfg.share,
69                mailbox_size: cfg.mailbox_size,
70            },
71        );
72
73        // Create the syncer
74        let (syncer, syncer_mailbox) = syncer::Actor::init(
75            context.with_label("syncer"),
76            syncer::Config {
77                partition_prefix: cfg.partition_prefix.clone(),
78                public_key: cfg.signer.public_key(),
79                identity: public,
80                participants: cfg.participants,
81                mailbox_size: cfg.mailbox_size,
82                backfill_quota: cfg.backfill_quota,
83                activity_timeout: cfg.activity_timeout,
84            },
85        )
86        .await;
87
88        // Create the consensus engine
89        let journal = Journal::init(
90            context.with_label("consensus_journal"),
91            variable::Config {
92                partition: format!("{}-consensus-journal", cfg.partition_prefix),
93            },
94        )
95        .await
96        .expect("failed to create journal");
97        let consensus = Consensus::new(
98            context.with_label("consensus"),
99            journal,
100            threshold_simplex::Config {
101                namespace: NAMESPACE.to_vec(),
102                crypto: cfg.signer,
103                automaton: application_mailbox.clone(),
104                relay: application_mailbox.clone(),
105                committer: application_mailbox,
106                supervisor,
107                mailbox_size: cfg.mailbox_size,
108                replay_concurrency: 1,
109                leader_timeout: cfg.leader_timeout,
110                notarization_timeout: cfg.notarization_timeout,
111                nullify_retry: cfg.nullify_retry,
112                fetch_timeout: cfg.fetch_timeout,
113                activity_timeout: cfg.activity_timeout,
114                max_fetch_count: cfg.max_fetch_count,
115                max_fetch_size: cfg.max_fetch_size,
116                fetch_concurrent: cfg.fetch_concurrent,
117                fetch_rate_per_peer: cfg.fetch_rate_per_peer,
118            },
119        );
120
121        // Return the engine
122        Self {
123            context,
124
125            application,
126            syncer,
127            syncer_mailbox,
128            consensus,
129        }
130    }
131
132    /// Start the `simplex` consensus engine.
133    ///
134    /// This will also rebuild the state of the engine from provided `Journal`.
135    pub fn start(
136        self,
137        voter_network: (
138            impl Sender<PublicKey = PublicKey>,
139            impl Receiver<PublicKey = PublicKey>,
140        ),
141        resolver_network: (
142            impl Sender<PublicKey = PublicKey>,
143            impl Receiver<PublicKey = PublicKey>,
144        ),
145        broadcast_network: (
146            impl Sender<PublicKey = PublicKey>,
147            impl Receiver<PublicKey = PublicKey>,
148        ),
149        backfill_network: (
150            impl Sender<PublicKey = PublicKey>,
151            impl Receiver<PublicKey = PublicKey>,
152        ),
153    ) -> Handle<()> {
154        self.context.clone().spawn(|_| {
155            self.run(
156                voter_network,
157                resolver_network,
158                broadcast_network,
159                backfill_network,
160            )
161        })
162    }
163
164    async fn run(
165        self,
166        voter_network: (
167            impl Sender<PublicKey = PublicKey>,
168            impl Receiver<PublicKey = PublicKey>,
169        ),
170        resolver_network: (
171            impl Sender<PublicKey = PublicKey>,
172            impl Receiver<PublicKey = PublicKey>,
173        ),
174        broadcast_network: (
175            impl Sender<PublicKey = PublicKey>,
176            impl Receiver<PublicKey = PublicKey>,
177        ),
178        backfill_network: (
179            impl Sender<PublicKey = PublicKey>,
180            impl Receiver<PublicKey = PublicKey>,
181        ),
182    ) {
183        // Start the application
184        let application_handle = self.application.start(self.syncer_mailbox);
185
186        // Start the syncer
187        let syncer_handle = self.syncer.start(broadcast_network, backfill_network);
188
189        // Start consensus
190        let consensus_handle = self.consensus.start(voter_network, resolver_network);
191
192        // Wait for any actor to finish
193        if let Err(e) =
194            try_join_all(vec![application_handle, syncer_handle, consensus_handle]).await
195        {
196            error!(?e, "engine failed");
197        } else {
198            warn!("engine stopped");
199        }
200    }
201}