commonware_consensus/threshold_simplex/
engine.rs

1use super::{
2    actors::{batcher, resolver, voter},
3    config::Config,
4    types::{Activity, Context, View},
5};
6use crate::{Automaton, Relay, Reporter, ThresholdSupervisor};
7use commonware_cryptography::{
8    bls12381::primitives::{group, variant::Variant},
9    Digest, Signer,
10};
11use commonware_macros::select;
12use commonware_p2p::{Blocker, Receiver, Sender};
13use commonware_runtime::{Clock, Handle, Metrics, Spawner, Storage};
14use governor::clock::Clock as GClock;
15use rand::{CryptoRng, Rng};
16use tracing::debug;
17
18/// Instance of `threshold-simplex` consensus engine.
19pub struct Engine<
20    E: Clock + GClock + Rng + CryptoRng + Spawner + Storage + Metrics,
21    C: Signer,
22    B: Blocker<PublicKey = C::PublicKey>,
23    V: Variant,
24    D: Digest,
25    A: Automaton<Context = Context<D>, Digest = D>,
26    R: Relay<Digest = D>,
27    F: Reporter<Activity = Activity<V, D>>,
28    S: ThresholdSupervisor<
29        Index = View,
30        PublicKey = C::PublicKey,
31        Identity = V::Public,
32        Seed = V::Signature,
33        Polynomial = Vec<V::Public>,
34        Share = group::Share,
35    >,
36> {
37    context: E,
38
39    voter: voter::Actor<E, C, B, V, D, A, R, F, S>,
40    voter_mailbox: voter::Mailbox<V, D>,
41
42    batcher: batcher::Actor<E, C::PublicKey, B, V, D, F, S>,
43    batcher_mailbox: batcher::Mailbox<C::PublicKey, V, D>,
44
45    resolver: resolver::Actor<E, C::PublicKey, B, V, D, S>,
46    resolver_mailbox: resolver::Mailbox<V, D>,
47}
48
49impl<
50        E: Clock + GClock + Rng + CryptoRng + Spawner + Storage + Metrics,
51        C: Signer,
52        B: Blocker<PublicKey = C::PublicKey>,
53        V: Variant,
54        D: Digest,
55        A: Automaton<Context = Context<D>, Digest = D>,
56        R: Relay<Digest = D>,
57        F: Reporter<Activity = Activity<V, D>>,
58        S: ThresholdSupervisor<
59            Seed = V::Signature,
60            Index = View,
61            Share = group::Share,
62            Polynomial = Vec<V::Public>,
63            Identity = V::Public,
64            PublicKey = C::PublicKey,
65        >,
66    > Engine<E, C, B, V, D, A, R, F, S>
67{
68    /// Create a new `threshold-simplex` consensus engine.
69    pub fn new(context: E, cfg: Config<C, B, V, D, A, R, F, S>) -> Self {
70        // Ensure configuration is valid
71        cfg.assert();
72
73        // Create batcher
74        let (batcher, batcher_mailbox) = batcher::Actor::new(
75            context.with_label("batcher"),
76            batcher::Config {
77                blocker: cfg.blocker.clone(),
78                reporter: cfg.reporter.clone(),
79                supervisor: cfg.supervisor.clone(),
80                namespace: cfg.namespace.clone(),
81                mailbox_size: cfg.mailbox_size,
82                activity_timeout: cfg.activity_timeout,
83                skip_timeout: cfg.skip_timeout,
84            },
85        );
86
87        // Create voter
88        let (voter, voter_mailbox) = voter::Actor::new(
89            context.with_label("voter"),
90            voter::Config {
91                crypto: cfg.crypto.clone(),
92                blocker: cfg.blocker.clone(),
93                automaton: cfg.automaton,
94                relay: cfg.relay,
95                reporter: cfg.reporter,
96                supervisor: cfg.supervisor.clone(),
97                partition: cfg.partition,
98                compression: cfg.compression,
99                mailbox_size: cfg.mailbox_size,
100                namespace: cfg.namespace.clone(),
101                leader_timeout: cfg.leader_timeout,
102                notarization_timeout: cfg.notarization_timeout,
103                nullify_retry: cfg.nullify_retry,
104                activity_timeout: cfg.activity_timeout,
105                replay_buffer: cfg.replay_buffer,
106                write_buffer: cfg.write_buffer,
107                buffer_pool: cfg.buffer_pool,
108            },
109        );
110
111        // Create resolver
112        let (resolver, resolver_mailbox) = resolver::Actor::new(
113            context.with_label("resolver"),
114            resolver::Config {
115                blocker: cfg.blocker,
116                crypto: cfg.crypto.public_key(),
117                supervisor: cfg.supervisor,
118                mailbox_size: cfg.mailbox_size,
119                namespace: cfg.namespace,
120                activity_timeout: cfg.activity_timeout,
121                fetch_timeout: cfg.fetch_timeout,
122                fetch_concurrent: cfg.fetch_concurrent,
123                max_fetch_count: cfg.max_fetch_count,
124                fetch_rate_per_peer: cfg.fetch_rate_per_peer,
125            },
126        );
127
128        // Return the engine
129        Self {
130            context,
131
132            voter,
133            voter_mailbox,
134
135            batcher,
136            batcher_mailbox,
137
138            resolver,
139            resolver_mailbox,
140        }
141    }
142
143    /// Start the `threshold-simplex` consensus engine.
144    ///
145    /// This will also rebuild the state of the engine from provided `Journal`.
146    pub fn start(
147        self,
148        pending_network: (
149            impl Sender<PublicKey = C::PublicKey>,
150            impl Receiver<PublicKey = C::PublicKey>,
151        ),
152        recovered_network: (
153            impl Sender<PublicKey = C::PublicKey>,
154            impl Receiver<PublicKey = C::PublicKey>,
155        ),
156        resolver_network: (
157            impl Sender<PublicKey = C::PublicKey>,
158            impl Receiver<PublicKey = C::PublicKey>,
159        ),
160    ) -> Handle<()> {
161        self.context
162            .clone()
163            .spawn(|_| self.run(pending_network, recovered_network, resolver_network))
164    }
165
166    async fn run(
167        self,
168        pending_network: (
169            impl Sender<PublicKey = C::PublicKey>,
170            impl Receiver<PublicKey = C::PublicKey>,
171        ),
172        recovered_network: (
173            impl Sender<PublicKey = C::PublicKey>,
174            impl Receiver<PublicKey = C::PublicKey>,
175        ),
176        resolver_network: (
177            impl Sender<PublicKey = C::PublicKey>,
178            impl Receiver<PublicKey = C::PublicKey>,
179        ),
180    ) {
181        // Start the batcher
182        let (pending_sender, pending_receiver) = pending_network;
183        let mut batcher_task = self
184            .batcher
185            .start(self.voter_mailbox.clone(), pending_receiver);
186
187        // Start the resolver
188        let (resolver_sender, resolver_receiver) = resolver_network;
189        let mut resolver_task =
190            self.resolver
191                .start(self.voter_mailbox, resolver_sender, resolver_receiver);
192
193        // Start the voter
194        let (recovered_sender, recovered_receiver) = recovered_network;
195        let mut voter_task = self.voter.start(
196            self.batcher_mailbox,
197            self.resolver_mailbox,
198            pending_sender,
199            recovered_sender,
200            recovered_receiver,
201        );
202
203        // Wait for the resolver or voter to finish
204        select! {
205            _ = &mut voter_task => {
206                debug!("voter finished");
207                resolver_task.abort();
208                batcher_task.abort();
209            },
210            _ = &mut batcher_task => {
211                debug!("batcher finished");
212                voter_task.abort();
213                resolver_task.abort();
214            },
215            _ = &mut resolver_task => {
216                debug!("resolver finished");
217                voter_task.abort();
218                batcher_task.abort();
219            },
220        }
221    }
222}