commonware_consensus/simplex/
engine.rs

1use super::{
2    actors::{batcher, resolver, voter},
3    config::Config,
4    elector::Config as Elector,
5    types::{Activity, Context},
6};
7use crate::{simplex::scheme::Scheme, CertifiableAutomaton, Relay, Reporter};
8use commonware_cryptography::Digest;
9use commonware_macros::select;
10use commonware_p2p::{Blocker, Receiver, Sender};
11use commonware_parallel::Strategy;
12use commonware_runtime::{spawn_cell, Clock, ContextCell, Handle, Metrics, Spawner, Storage};
13use rand_core::CryptoRngCore;
14use tracing::debug;
15
16/// Instance of `simplex` consensus engine.
17pub struct Engine<
18    E: Clock + CryptoRngCore + Spawner + Storage + Metrics,
19    S: Scheme<D>,
20    L: Elector<S>,
21    B: Blocker<PublicKey = S::PublicKey>,
22    D: Digest,
23    A: CertifiableAutomaton<Context = Context<D, S::PublicKey>, Digest = D>,
24    R: Relay<Digest = D>,
25    F: Reporter<Activity = Activity<S, D>>,
26    T: Strategy,
27> {
28    context: ContextCell<E>,
29
30    voter: voter::Actor<E, S, L, B, D, A, R, F>,
31    voter_mailbox: voter::Mailbox<S, D>,
32
33    batcher: batcher::Actor<E, S, B, D, F, T>,
34    batcher_mailbox: batcher::Mailbox<S, D>,
35
36    resolver: resolver::Actor<E, S, B, D, T>,
37    resolver_mailbox: resolver::Mailbox<S, D>,
38}
39
40impl<
41        E: Clock + CryptoRngCore + Spawner + Storage + Metrics,
42        S: Scheme<D>,
43        L: Elector<S>,
44        B: Blocker<PublicKey = S::PublicKey>,
45        D: Digest,
46        A: CertifiableAutomaton<Context = Context<D, S::PublicKey>, Digest = D>,
47        R: Relay<Digest = D>,
48        F: Reporter<Activity = Activity<S, D>>,
49        T: Strategy,
50    > Engine<E, S, L, B, D, A, R, F, T>
51{
52    /// Create a new `simplex` consensus engine.
53    pub fn new(context: E, cfg: Config<S, L, B, D, A, R, F, T>) -> Self {
54        // Ensure configuration is valid
55        cfg.assert();
56
57        // Create batcher
58        let (batcher, batcher_mailbox) = batcher::Actor::new(
59            context.with_label("batcher"),
60            batcher::Config {
61                scheme: cfg.scheme.clone(),
62                blocker: cfg.blocker.clone(),
63                reporter: cfg.reporter.clone(),
64                strategy: cfg.strategy.clone(),
65                epoch: cfg.epoch,
66                mailbox_size: cfg.mailbox_size,
67                activity_timeout: cfg.activity_timeout,
68                skip_timeout: cfg.skip_timeout,
69            },
70        );
71
72        // Create voter
73        let (voter, voter_mailbox) = voter::Actor::new(
74            context.with_label("voter"),
75            voter::Config {
76                scheme: cfg.scheme.clone(),
77                elector: cfg.elector,
78                blocker: cfg.blocker.clone(),
79                automaton: cfg.automaton,
80                relay: cfg.relay,
81                reporter: cfg.reporter,
82                partition: cfg.partition,
83                mailbox_size: cfg.mailbox_size,
84                epoch: cfg.epoch,
85                leader_timeout: cfg.leader_timeout,
86                notarization_timeout: cfg.notarization_timeout,
87                nullify_retry: cfg.nullify_retry,
88                activity_timeout: cfg.activity_timeout,
89                replay_buffer: cfg.replay_buffer,
90                write_buffer: cfg.write_buffer,
91                buffer_pool: cfg.buffer_pool,
92            },
93        );
94
95        // Create resolver
96        let (resolver, resolver_mailbox) = resolver::Actor::new(
97            context.with_label("resolver"),
98            resolver::Config {
99                blocker: cfg.blocker,
100                scheme: cfg.scheme,
101                strategy: cfg.strategy,
102                mailbox_size: cfg.mailbox_size,
103                epoch: cfg.epoch,
104                fetch_concurrent: cfg.fetch_concurrent,
105                fetch_timeout: cfg.fetch_timeout,
106            },
107        );
108
109        // Return the engine
110        Self {
111            context: ContextCell::new(context),
112
113            voter,
114            voter_mailbox,
115
116            batcher,
117            batcher_mailbox,
118
119            resolver,
120            resolver_mailbox,
121        }
122    }
123
124    /// Start the `simplex` consensus engine.
125    ///
126    /// This will also rebuild the state of the engine from provided `Journal`.
127    ///
128    /// # Network Channels
129    ///
130    /// The engine requires three separate network channels, each carrying votes or
131    /// certificates to help drive the consensus engine.
132    ///
133    /// ## `vote_network`
134    ///
135    /// Carries **individual votes**:
136    /// - [`Notarize`](super::types::Notarize): Vote to notarize a proposal
137    /// - [`Nullify`](super::types::Nullify): Vote to skip a view
138    /// - [`Finalize`](super::types::Finalize): Vote to finalize a notarized proposal
139    ///
140    /// These messages are sent to the batcher, which performs batch signature
141    /// verification before forwarding valid votes to the voter for aggregation.
142    ///
143    /// ## `certificate_network`
144    ///
145    /// Carries **certificates**:
146    /// - [`Notarization`](super::types::Notarization): Proof that a proposal was notarized
147    /// - [`Nullification`](super::types::Nullification): Proof that a view was skipped
148    /// - [`Finalization`](super::types::Finalization): Proof that a proposal was finalized
149    ///
150    /// Certificates are broadcast on this channel as soon as they are constructed
151    /// from collected votes. We separate this from the `vote_network` to optimistically
152    /// allow for certificate processing to short-circuit vote processing (if we receive
153    /// a certificate before processing pending votes, we can skip them).
154    ///
155    /// ## `resolver_network`
156    ///
157    /// Used for request-response certificate fetching. When a node needs to
158    /// catch up on a view it missed (e.g., to verify a proposal's parent), it
159    /// uses this channel to request certificates from peers. The resolver handles
160    /// rate limiting, retries, and peer selection for these requests.
161    pub fn start(
162        mut self,
163        vote_network: (
164            impl Sender<PublicKey = S::PublicKey>,
165            impl Receiver<PublicKey = S::PublicKey>,
166        ),
167        certificate_network: (
168            impl Sender<PublicKey = S::PublicKey>,
169            impl Receiver<PublicKey = S::PublicKey>,
170        ),
171        resolver_network: (
172            impl Sender<PublicKey = S::PublicKey>,
173            impl Receiver<PublicKey = S::PublicKey>,
174        ),
175    ) -> Handle<()> {
176        spawn_cell!(
177            self.context,
178            self.run(vote_network, certificate_network, resolver_network)
179                .await
180        )
181    }
182
183    async fn run(
184        self,
185        vote_network: (
186            impl Sender<PublicKey = S::PublicKey>,
187            impl Receiver<PublicKey = S::PublicKey>,
188        ),
189        certificate_network: (
190            impl Sender<PublicKey = S::PublicKey>,
191            impl Receiver<PublicKey = S::PublicKey>,
192        ),
193        resolver_network: (
194            impl Sender<PublicKey = S::PublicKey>,
195            impl Receiver<PublicKey = S::PublicKey>,
196        ),
197    ) {
198        // Start the batcher (receives votes via vote_network, certificates via certificate_network)
199        // Batcher sends proposals/certificates to voter via voter_mailbox
200        let (vote_sender, vote_receiver) = vote_network;
201        let (certificate_sender, certificate_receiver) = certificate_network;
202        let mut batcher_task = self.batcher.start(
203            self.voter_mailbox.clone(),
204            vote_receiver,
205            certificate_receiver,
206        );
207
208        // Start the resolver (sends certificates to voter via voter_mailbox)
209        let (resolver_sender, resolver_receiver) = resolver_network;
210        let mut resolver_task =
211            self.resolver
212                .start(self.voter_mailbox, resolver_sender, resolver_receiver);
213
214        // Start the voter
215        let mut voter_task = self.voter.start(
216            self.batcher_mailbox,
217            self.resolver_mailbox,
218            vote_sender,
219            certificate_sender,
220        );
221
222        // Wait for the resolver or voter to finish
223        let mut shutdown = self.context.stopped();
224        select! {
225            _ = &mut shutdown => {
226                debug!("context shutdown, stopping engine");
227            },
228            _ = &mut voter_task => {
229                panic!("voter should not finish");
230            },
231            _ = &mut batcher_task => {
232                panic!("batcher should not finish");
233            },
234            _ = &mut resolver_task => {
235                panic!("resolver should not finish");
236            },
237        }
238    }
239}