commonware_consensus/threshold_simplex/
engine.rs

1use super::{
2    actors::{batcher, resolver, voter},
3    config::Config,
4    types::{Activity, Context, View},
5};
6use crate::{Automaton, Relay, Reporter, ThresholdSupervisor};
7use commonware_cryptography::{
8    bls12381::primitives::{group, variant::Variant},
9    Digest, Signer,
10};
11use commonware_macros::select;
12use commonware_p2p::{Blocker, Receiver, Sender};
13use commonware_runtime::{Clock, Handle, Metrics, Spawner, Storage};
14use governor::clock::Clock as GClock;
15use rand::{CryptoRng, Rng};
16use tracing::debug;
17
18/// Instance of `threshold-simplex` consensus engine.
19pub struct Engine<
20    E: Clock + GClock + Rng + CryptoRng + Spawner + Storage + Metrics,
21    C: Signer,
22    B: Blocker<PublicKey = C::PublicKey>,
23    V: Variant,
24    D: Digest,
25    A: Automaton<Context = Context<D>, Digest = D>,
26    R: Relay<Digest = D>,
27    F: Reporter<Activity = Activity<V, D>>,
28    S: ThresholdSupervisor<
29        Index = View,
30        PublicKey = C::PublicKey,
31        Identity = V::Public,
32        Seed = V::Signature,
33        Polynomial = Vec<V::Public>,
34        Share = group::Share,
35    >,
36> {
37    context: E,
38
39    voter: voter::Actor<E, C, B, V, D, A, R, F, S>,
40    voter_mailbox: voter::Mailbox<V, D>,
41
42    batcher: batcher::Actor<E, C::PublicKey, B, V, D, F, S>,
43    batcher_mailbox: batcher::Mailbox<C::PublicKey, V, D>,
44
45    resolver: resolver::Actor<E, C::PublicKey, B, V, D, S>,
46    resolver_mailbox: resolver::Mailbox<V, D>,
47}
48
49impl<
50        E: Clock + GClock + Rng + CryptoRng + Spawner + Storage + Metrics,
51        C: Signer,
52        B: Blocker<PublicKey = C::PublicKey>,
53        V: Variant,
54        D: Digest,
55        A: Automaton<Context = Context<D>, Digest = D>,
56        R: Relay<Digest = D>,
57        F: Reporter<Activity = Activity<V, D>>,
58        S: ThresholdSupervisor<
59            Seed = V::Signature,
60            Index = View,
61            Share = group::Share,
62            Polynomial = Vec<V::Public>,
63            Identity = V::Public,
64            PublicKey = C::PublicKey,
65        >,
66    > Engine<E, C, B, V, D, A, R, F, S>
67{
68    /// Create a new `threshold-simplex` consensus engine.
69    pub fn new(context: E, cfg: Config<C, B, V, D, A, R, F, S>) -> Self {
70        // Ensure configuration is valid
71        cfg.assert();
72
73        // Create batcher
74        let (batcher, batcher_mailbox) = batcher::Actor::new(
75            context.with_label("batcher"),
76            batcher::Config {
77                blocker: cfg.blocker.clone(),
78                reporter: cfg.reporter.clone(),
79                supervisor: cfg.supervisor.clone(),
80                namespace: cfg.namespace.clone(),
81                mailbox_size: cfg.mailbox_size,
82                activity_timeout: cfg.activity_timeout,
83                skip_timeout: cfg.skip_timeout,
84            },
85        );
86
87        // Create voter
88        let (voter, voter_mailbox) = voter::Actor::new(
89            context.with_label("voter"),
90            voter::Config {
91                crypto: cfg.crypto.clone(),
92                blocker: cfg.blocker.clone(),
93                automaton: cfg.automaton,
94                relay: cfg.relay,
95                reporter: cfg.reporter,
96                supervisor: cfg.supervisor.clone(),
97                partition: cfg.partition,
98                mailbox_size: cfg.mailbox_size,
99                namespace: cfg.namespace.clone(),
100                leader_timeout: cfg.leader_timeout,
101                notarization_timeout: cfg.notarization_timeout,
102                nullify_retry: cfg.nullify_retry,
103                activity_timeout: cfg.activity_timeout,
104                replay_buffer: cfg.replay_buffer,
105                write_buffer: cfg.write_buffer,
106                buffer_pool: cfg.buffer_pool,
107            },
108        );
109
110        // Create resolver
111        let (resolver, resolver_mailbox) = resolver::Actor::new(
112            context.with_label("resolver"),
113            resolver::Config {
114                blocker: cfg.blocker,
115                crypto: cfg.crypto.public_key(),
116                supervisor: cfg.supervisor,
117                mailbox_size: cfg.mailbox_size,
118                namespace: cfg.namespace,
119                activity_timeout: cfg.activity_timeout,
120                fetch_timeout: cfg.fetch_timeout,
121                fetch_concurrent: cfg.fetch_concurrent,
122                max_fetch_count: cfg.max_fetch_count,
123                fetch_rate_per_peer: cfg.fetch_rate_per_peer,
124            },
125        );
126
127        // Return the engine
128        Self {
129            context,
130
131            voter,
132            voter_mailbox,
133
134            batcher,
135            batcher_mailbox,
136
137            resolver,
138            resolver_mailbox,
139        }
140    }
141
142    /// Start the `threshold-simplex` consensus engine.
143    ///
144    /// This will also rebuild the state of the engine from provided `Journal`.
145    pub fn start(
146        self,
147        pending_network: (
148            impl Sender<PublicKey = C::PublicKey>,
149            impl Receiver<PublicKey = C::PublicKey>,
150        ),
151        recovered_network: (
152            impl Sender<PublicKey = C::PublicKey>,
153            impl Receiver<PublicKey = C::PublicKey>,
154        ),
155        resolver_network: (
156            impl Sender<PublicKey = C::PublicKey>,
157            impl Receiver<PublicKey = C::PublicKey>,
158        ),
159    ) -> Handle<()> {
160        self.context
161            .clone()
162            .spawn(|_| self.run(pending_network, recovered_network, resolver_network))
163    }
164
165    async fn run(
166        self,
167        pending_network: (
168            impl Sender<PublicKey = C::PublicKey>,
169            impl Receiver<PublicKey = C::PublicKey>,
170        ),
171        recovered_network: (
172            impl Sender<PublicKey = C::PublicKey>,
173            impl Receiver<PublicKey = C::PublicKey>,
174        ),
175        resolver_network: (
176            impl Sender<PublicKey = C::PublicKey>,
177            impl Receiver<PublicKey = C::PublicKey>,
178        ),
179    ) {
180        // Start the batcher
181        let (pending_sender, pending_receiver) = pending_network;
182        let mut batcher_task = self
183            .batcher
184            .start(self.voter_mailbox.clone(), pending_receiver);
185
186        // Start the resolver
187        let (resolver_sender, resolver_receiver) = resolver_network;
188        let mut resolver_task =
189            self.resolver
190                .start(self.voter_mailbox, resolver_sender, resolver_receiver);
191
192        // Start the voter
193        let (recovered_sender, recovered_receiver) = recovered_network;
194        let mut voter_task = self.voter.start(
195            self.batcher_mailbox,
196            self.resolver_mailbox,
197            pending_sender,
198            recovered_sender,
199            recovered_receiver,
200        );
201
202        // Wait for the resolver or voter to finish
203        select! {
204            _ = &mut voter_task => {
205                debug!("voter finished");
206                resolver_task.abort();
207                batcher_task.abort();
208            },
209            _ = &mut batcher_task => {
210                debug!("batcher finished");
211                voter_task.abort();
212                resolver_task.abort();
213            },
214            _ = &mut resolver_task => {
215                debug!("resolver finished");
216                voter_task.abort();
217                batcher_task.abort();
218            },
219        }
220    }
221}