alto_chain/
engine.rs

1use crate::{
2    actors::{application, syncer},
3    Indexer,
4};
5use alto_types::NAMESPACE;
6use commonware_consensus::threshold_simplex::{self, Engine as Consensus, Prover};
7use commonware_cryptography::{
8    bls12381::primitives::{group, poly::public, poly::Poly},
9    ed25519::PublicKey,
10    sha256::Digest,
11    Ed25519, Scheme,
12};
13use commonware_p2p::{Receiver, Sender};
14use commonware_runtime::{Blob, Clock, Handle, Metrics, Spawner, Storage};
15use commonware_storage::journal::variable::{self, Journal};
16use futures::future::try_join_all;
17use governor::clock::Clock as GClock;
18use governor::Quota;
19use rand::{CryptoRng, Rng};
20use std::time::Duration;
21use tracing::{error, warn};
22
23pub struct Config<I: Indexer> {
24    pub partition_prefix: String,
25    pub signer: Ed25519,
26    pub identity: Poly<group::Public>,
27    pub share: group::Share,
28    pub participants: Vec<PublicKey>,
29    pub mailbox_size: usize,
30    pub backfill_quota: Quota,
31
32    pub leader_timeout: Duration,
33    pub notarization_timeout: Duration,
34    pub nullify_retry: Duration,
35    pub fetch_timeout: Duration,
36    pub activity_timeout: u64,
37    pub max_fetch_count: usize,
38    pub max_fetch_size: usize,
39    pub fetch_concurrent: usize,
40    pub fetch_rate_per_peer: Quota,
41
42    pub indexer: Option<I>,
43}
44
45pub struct Engine<
46    B: Blob,
47    E: Clock + GClock + Rng + CryptoRng + Spawner + Storage<B> + Metrics,
48    I: Indexer,
49> {
50    context: E,
51
52    application: application::Actor<E>,
53    syncer: syncer::Actor<B, E, I>,
54    syncer_mailbox: syncer::Mailbox,
55    consensus: Consensus<
56        B,
57        E,
58        Ed25519,
59        Digest,
60        application::Mailbox,
61        application::Mailbox,
62        application::Mailbox,
63        application::Supervisor,
64    >,
65}
66
67impl<B: Blob, E: Clock + GClock + Rng + CryptoRng + Spawner + Storage<B> + Metrics, I: Indexer>
68    Engine<B, E, I>
69{
70    pub async fn new(context: E, cfg: Config<I>) -> Self {
71        // Create the application
72        let public = public(&cfg.identity);
73        let (application, supervisor, application_mailbox) = application::Actor::new(
74            context.with_label("application"),
75            application::Config {
76                prover: Prover::new(public, NAMESPACE),
77                participants: cfg.participants.clone(),
78                identity: cfg.identity.clone(),
79                share: cfg.share,
80                mailbox_size: cfg.mailbox_size,
81            },
82        );
83
84        // Create the syncer
85        let (syncer, syncer_mailbox) = syncer::Actor::init(
86            context.with_label("syncer"),
87            syncer::Config {
88                partition_prefix: cfg.partition_prefix.clone(),
89                public_key: cfg.signer.public_key(),
90                identity: public,
91                participants: cfg.participants,
92                mailbox_size: cfg.mailbox_size,
93                backfill_quota: cfg.backfill_quota,
94                activity_timeout: cfg.activity_timeout,
95                indexer: cfg.indexer,
96            },
97        )
98        .await;
99
100        // Create the consensus engine
101        let journal = Journal::init(
102            context.with_label("consensus_journal"),
103            variable::Config {
104                partition: format!("{}-consensus-journal", cfg.partition_prefix),
105            },
106        )
107        .await
108        .expect("failed to create journal");
109        let consensus = Consensus::new(
110            context.with_label("consensus"),
111            journal,
112            threshold_simplex::Config {
113                namespace: NAMESPACE.to_vec(),
114                crypto: cfg.signer,
115                automaton: application_mailbox.clone(),
116                relay: application_mailbox.clone(),
117                committer: application_mailbox,
118                supervisor,
119                mailbox_size: cfg.mailbox_size,
120                replay_concurrency: 1,
121                leader_timeout: cfg.leader_timeout,
122                notarization_timeout: cfg.notarization_timeout,
123                nullify_retry: cfg.nullify_retry,
124                fetch_timeout: cfg.fetch_timeout,
125                activity_timeout: cfg.activity_timeout,
126                max_fetch_count: cfg.max_fetch_count,
127                max_fetch_size: cfg.max_fetch_size,
128                fetch_concurrent: cfg.fetch_concurrent,
129                fetch_rate_per_peer: cfg.fetch_rate_per_peer,
130            },
131        );
132
133        // Return the engine
134        Self {
135            context,
136
137            application,
138            syncer,
139            syncer_mailbox,
140            consensus,
141        }
142    }
143
144    /// Start the `simplex` consensus engine.
145    ///
146    /// This will also rebuild the state of the engine from provided `Journal`.
147    pub fn start(
148        self,
149        voter_network: (
150            impl Sender<PublicKey = PublicKey>,
151            impl Receiver<PublicKey = PublicKey>,
152        ),
153        resolver_network: (
154            impl Sender<PublicKey = PublicKey>,
155            impl Receiver<PublicKey = PublicKey>,
156        ),
157        broadcast_network: (
158            impl Sender<PublicKey = PublicKey>,
159            impl Receiver<PublicKey = PublicKey>,
160        ),
161        backfill_network: (
162            impl Sender<PublicKey = PublicKey>,
163            impl Receiver<PublicKey = PublicKey>,
164        ),
165    ) -> Handle<()> {
166        self.context.clone().spawn(|_| {
167            self.run(
168                voter_network,
169                resolver_network,
170                broadcast_network,
171                backfill_network,
172            )
173        })
174    }
175
176    async fn run(
177        self,
178        voter_network: (
179            impl Sender<PublicKey = PublicKey>,
180            impl Receiver<PublicKey = PublicKey>,
181        ),
182        resolver_network: (
183            impl Sender<PublicKey = PublicKey>,
184            impl Receiver<PublicKey = PublicKey>,
185        ),
186        broadcast_network: (
187            impl Sender<PublicKey = PublicKey>,
188            impl Receiver<PublicKey = PublicKey>,
189        ),
190        backfill_network: (
191            impl Sender<PublicKey = PublicKey>,
192            impl Receiver<PublicKey = PublicKey>,
193        ),
194    ) {
195        // Start the application
196        let application_handle = self.application.start(self.syncer_mailbox);
197
198        // Start the syncer
199        let syncer_handle = self.syncer.start(broadcast_network, backfill_network);
200
201        // Start consensus
202        let consensus_handle = self.consensus.start(voter_network, resolver_network);
203
204        // Wait for any actor to finish
205        if let Err(e) =
206            try_join_all(vec![application_handle, syncer_handle, consensus_handle]).await
207        {
208            error!(?e, "engine failed");
209        } else {
210            warn!("engine stopped");
211        }
212    }
213}