commonware_consensus/threshold_simplex/
engine.rs

1use super::{
2    actors::{resolver, voter},
3    config::Config,
4    Context, View,
5};
6use crate::{Automaton, Committer, Relay, ThresholdSupervisor};
7use commonware_cryptography::{
8    bls12381::primitives::{group, poly},
9    Scheme,
10};
11use commonware_macros::select;
12use commonware_p2p::{Receiver, Sender};
13use commonware_runtime::{Blob, Clock, Handle, Metrics, Spawner, Storage};
14use commonware_storage::journal::variable::Journal;
15use commonware_utils::Array;
16use governor::clock::Clock as GClock;
17use rand::{CryptoRng, Rng};
18use tracing::debug;
19
20/// Instance of `threshold-simplex` consensus engine.
21pub struct Engine<
22    B: Blob,
23    E: Clock + GClock + Rng + CryptoRng + Spawner + Storage<B> + Metrics,
24    C: Scheme,
25    D: Array,
26    A: Automaton<Context = Context<D>, Digest = D>,
27    R: Relay<Digest = D>,
28    F: Committer<Digest = D>,
29    S: ThresholdSupervisor<
30        Seed = group::Signature,
31        Index = View,
32        Share = group::Share,
33        Identity = poly::Public,
34        PublicKey = C::PublicKey,
35    >,
36> {
37    context: E,
38
39    voter: voter::Actor<B, E, C, D, A, R, F, S>,
40    voter_mailbox: voter::Mailbox<D>,
41    resolver: resolver::Actor<E, C, D, S>,
42    resolver_mailbox: resolver::Mailbox,
43}
44
45impl<
46        B: Blob,
47        E: Clock + GClock + Rng + CryptoRng + Spawner + Storage<B> + Metrics,
48        C: Scheme,
49        D: Array,
50        A: Automaton<Context = Context<D>, Digest = D>,
51        R: Relay<Digest = D>,
52        F: Committer<Digest = D>,
53        S: ThresholdSupervisor<
54            Seed = group::Signature,
55            Index = View,
56            Share = group::Share,
57            Identity = poly::Public,
58            PublicKey = C::PublicKey,
59        >,
60    > Engine<B, E, C, D, A, R, F, S>
61{
62    /// Create a new `threshold-simplex` consensus engine.
63    pub fn new(context: E, journal: Journal<B, E>, cfg: Config<C, D, A, R, F, S>) -> Self {
64        // Ensure configuration is valid
65        cfg.assert();
66
67        // Create voter
68        let (voter, voter_mailbox) = voter::Actor::new(
69            context.clone(),
70            journal,
71            voter::Config {
72                crypto: cfg.crypto.clone(),
73                automaton: cfg.automaton,
74                relay: cfg.relay,
75                committer: cfg.committer,
76                supervisor: cfg.supervisor.clone(),
77                mailbox_size: cfg.mailbox_size,
78                namespace: cfg.namespace.clone(),
79                leader_timeout: cfg.leader_timeout,
80                notarization_timeout: cfg.notarization_timeout,
81                nullify_retry: cfg.nullify_retry,
82                activity_timeout: cfg.activity_timeout,
83                replay_concurrency: cfg.replay_concurrency,
84            },
85        );
86
87        // Create resolver
88        let (resolver, resolver_mailbox) = resolver::Actor::new(
89            context.clone(),
90            resolver::Config {
91                crypto: cfg.crypto,
92                supervisor: cfg.supervisor,
93                mailbox_size: cfg.mailbox_size,
94                namespace: cfg.namespace,
95                activity_timeout: cfg.activity_timeout,
96                fetch_timeout: cfg.fetch_timeout,
97                fetch_concurrent: cfg.fetch_concurrent,
98                max_fetch_count: cfg.max_fetch_count,
99                max_fetch_size: cfg.max_fetch_size,
100                fetch_rate_per_peer: cfg.fetch_rate_per_peer,
101            },
102        );
103
104        // Return the engine
105        Self {
106            context,
107
108            voter,
109            voter_mailbox,
110            resolver,
111            resolver_mailbox,
112        }
113    }
114
115    /// Start the `threshold-simplex` consensus engine.
116    ///
117    /// This will also rebuild the state of the engine from provided `Journal`.
118    pub fn start(
119        self,
120        voter_network: (
121            impl Sender<PublicKey = C::PublicKey>,
122            impl Receiver<PublicKey = C::PublicKey>,
123        ),
124        resolver_network: (
125            impl Sender<PublicKey = C::PublicKey>,
126            impl Receiver<PublicKey = C::PublicKey>,
127        ),
128    ) -> Handle<()> {
129        self.context
130            .clone()
131            .spawn(|_| self.run(voter_network, resolver_network))
132    }
133
134    async fn run(
135        self,
136        voter_network: (
137            impl Sender<PublicKey = C::PublicKey>,
138            impl Receiver<PublicKey = C::PublicKey>,
139        ),
140        resolver_network: (
141            impl Sender<PublicKey = C::PublicKey>,
142            impl Receiver<PublicKey = C::PublicKey>,
143        ),
144    ) {
145        // Start the voter
146        let (voter_sender, voter_receiver) = voter_network;
147        let mut voter_task = self
148            .voter
149            .start(self.resolver_mailbox, voter_sender, voter_receiver);
150
151        // Start the resolver
152        let (resolver_sender, resolver_receiver) = resolver_network;
153        let mut resolver_task =
154            self.resolver
155                .start(self.voter_mailbox, resolver_sender, resolver_receiver);
156
157        // Wait for the resolver or voter to finish
158        select! {
159            _ = &mut voter_task => {
160                debug!("voter finished");
161                resolver_task.abort();
162            },
163            _ = &mut resolver_task => {
164                debug!("resolver finished");
165                voter_task.abort();
166            },
167        }
168    }
169}