Skip to main content

commonware_consensus/simplex/
engine.rs

1use super::{
2    actors::{batcher, resolver, voter},
3    config::Config,
4    elector::Config as Elector,
5    types::{Activity, Context},
6};
7use crate::{simplex::scheme::Scheme, CertifiableAutomaton, Relay, Reporter};
8use commonware_cryptography::Digest;
9use commonware_macros::select;
10use commonware_p2p::{Blocker, Receiver, Sender};
11use commonware_parallel::Strategy;
12use commonware_runtime::{
13    spawn_cell, BufferPooler, Clock, ContextCell, Handle, Metrics, Spawner, Storage,
14};
15use rand_core::CryptoRngCore;
16use tracing::debug;
17
18/// Instance of `simplex` consensus engine.
19pub struct Engine<
20    E: BufferPooler + Clock + CryptoRngCore + Spawner + Storage + Metrics,
21    S: Scheme<D>,
22    L: Elector<S>,
23    B: Blocker<PublicKey = S::PublicKey>,
24    D: Digest,
25    A: CertifiableAutomaton<Context = Context<D, S::PublicKey>, Digest = D>,
26    R: Relay<Digest = D>,
27    F: Reporter<Activity = Activity<S, D>>,
28    T: Strategy,
29> {
30    context: ContextCell<E>,
31
32    voter: voter::Actor<E, S, L, B, D, A, R, F>,
33    voter_mailbox: voter::Mailbox<S, D>,
34
35    batcher: batcher::Actor<E, S, B, D, F, T>,
36    batcher_mailbox: batcher::Mailbox<S, D>,
37
38    resolver: resolver::Actor<E, S, B, D, T>,
39    resolver_mailbox: resolver::Mailbox<S, D>,
40}
41
42impl<
43        E: BufferPooler + Clock + CryptoRngCore + Spawner + Storage + Metrics,
44        S: Scheme<D>,
45        L: Elector<S>,
46        B: Blocker<PublicKey = S::PublicKey>,
47        D: Digest,
48        A: CertifiableAutomaton<Context = Context<D, S::PublicKey>, Digest = D>,
49        R: Relay<Digest = D>,
50        F: Reporter<Activity = Activity<S, D>>,
51        T: Strategy,
52    > Engine<E, S, L, B, D, A, R, F, T>
53{
54    /// Create a new `simplex` consensus engine.
55    pub fn new(context: E, cfg: Config<S, L, B, D, A, R, F, T>) -> Self {
56        // Ensure configuration is valid
57        cfg.assert();
58
59        // Create batcher
60        let (batcher, batcher_mailbox) = batcher::Actor::new(
61            context.with_label("batcher"),
62            batcher::Config {
63                scheme: cfg.scheme.clone(),
64                blocker: cfg.blocker.clone(),
65                reporter: cfg.reporter.clone(),
66                strategy: cfg.strategy.clone(),
67                epoch: cfg.epoch,
68                mailbox_size: cfg.mailbox_size,
69                activity_timeout: cfg.activity_timeout,
70                skip_timeout: cfg.skip_timeout,
71            },
72        );
73
74        // Create voter
75        let (voter, voter_mailbox) = voter::Actor::new(
76            context.with_label("voter"),
77            voter::Config {
78                scheme: cfg.scheme.clone(),
79                elector: cfg.elector,
80                blocker: cfg.blocker.clone(),
81                automaton: cfg.automaton,
82                relay: cfg.relay,
83                reporter: cfg.reporter,
84                partition: cfg.partition,
85                mailbox_size: cfg.mailbox_size,
86                epoch: cfg.epoch,
87                leader_timeout: cfg.leader_timeout,
88                certification_timeout: cfg.certification_timeout,
89                timeout_retry: cfg.timeout_retry,
90                activity_timeout: cfg.activity_timeout,
91                replay_buffer: cfg.replay_buffer,
92                write_buffer: cfg.write_buffer,
93                page_cache: cfg.page_cache,
94            },
95        );
96
97        // Create resolver
98        let (resolver, resolver_mailbox) = resolver::Actor::new(
99            context.with_label("resolver"),
100            resolver::Config {
101                blocker: cfg.blocker,
102                scheme: cfg.scheme,
103                strategy: cfg.strategy,
104                mailbox_size: cfg.mailbox_size,
105                epoch: cfg.epoch,
106                fetch_concurrent: cfg.fetch_concurrent,
107                fetch_timeout: cfg.fetch_timeout,
108            },
109        );
110
111        // Return the engine
112        Self {
113            context: ContextCell::new(context),
114
115            voter,
116            voter_mailbox,
117
118            batcher,
119            batcher_mailbox,
120
121            resolver,
122            resolver_mailbox,
123        }
124    }
125
126    /// Start the `simplex` consensus engine.
127    ///
128    /// This will also rebuild the state of the engine from provided `Journal`.
129    ///
130    /// # Network Channels
131    ///
132    /// The engine requires three separate network channels, each carrying votes or
133    /// certificates to help drive the consensus engine.
134    ///
135    /// ## `vote_network`
136    ///
137    /// Carries **individual votes**:
138    /// - [`Notarize`](super::types::Notarize): Vote to notarize a proposal
139    /// - [`Nullify`](super::types::Nullify): Vote to skip a view
140    /// - [`Finalize`](super::types::Finalize): Vote to finalize a notarized proposal
141    ///
142    /// These messages are sent to the batcher, which performs batch signature
143    /// verification before forwarding valid votes to the voter for aggregation.
144    ///
145    /// ## `certificate_network`
146    ///
147    /// Carries **certificates**:
148    /// - [`Notarization`](super::types::Notarization): Proof that a proposal was notarized
149    /// - [`Nullification`](super::types::Nullification): Proof that a view was skipped
150    /// - [`Finalization`](super::types::Finalization): Proof that a proposal was finalized
151    ///
152    /// Certificates are broadcast on this channel as soon as they are constructed
153    /// from collected votes. We separate this from the `vote_network` to optimistically
154    /// allow for certificate processing to short-circuit vote processing (if we receive
155    /// a certificate before processing pending votes, we can skip them).
156    ///
157    /// ## `resolver_network`
158    ///
159    /// Used for request-response certificate fetching. When a node needs to
160    /// catch up on a view it missed (e.g., to verify a proposal's parent), it
161    /// uses this channel to request certificates from peers. The resolver handles
162    /// rate limiting, retries, and peer selection for these requests.
163    pub fn start(
164        mut self,
165        vote_network: (
166            impl Sender<PublicKey = S::PublicKey>,
167            impl Receiver<PublicKey = S::PublicKey>,
168        ),
169        certificate_network: (
170            impl Sender<PublicKey = S::PublicKey>,
171            impl Receiver<PublicKey = S::PublicKey>,
172        ),
173        resolver_network: (
174            impl Sender<PublicKey = S::PublicKey>,
175            impl Receiver<PublicKey = S::PublicKey>,
176        ),
177    ) -> Handle<()> {
178        spawn_cell!(
179            self.context,
180            self.run(vote_network, certificate_network, resolver_network)
181                .await
182        )
183    }
184
185    async fn run(
186        self,
187        vote_network: (
188            impl Sender<PublicKey = S::PublicKey>,
189            impl Receiver<PublicKey = S::PublicKey>,
190        ),
191        certificate_network: (
192            impl Sender<PublicKey = S::PublicKey>,
193            impl Receiver<PublicKey = S::PublicKey>,
194        ),
195        resolver_network: (
196            impl Sender<PublicKey = S::PublicKey>,
197            impl Receiver<PublicKey = S::PublicKey>,
198        ),
199    ) {
200        // Start the batcher (receives votes via vote_network, certificates via certificate_network)
201        // Batcher sends proposals/certificates to voter via voter_mailbox
202        let (vote_sender, vote_receiver) = vote_network;
203        let (certificate_sender, certificate_receiver) = certificate_network;
204        let mut batcher_task = self.batcher.start(
205            self.voter_mailbox.clone(),
206            vote_receiver,
207            certificate_receiver,
208        );
209
210        // Start the resolver (sends certificates to voter via voter_mailbox)
211        let (resolver_sender, resolver_receiver) = resolver_network;
212        let mut resolver_task =
213            self.resolver
214                .start(self.voter_mailbox, resolver_sender, resolver_receiver);
215
216        // Start the voter
217        let mut voter_task = self.voter.start(
218            self.batcher_mailbox,
219            self.resolver_mailbox,
220            vote_sender,
221            certificate_sender,
222        );
223
224        // Wait for the resolver or voter to finish
225        let mut shutdown = self.context.stopped();
226        select! {
227            _ = &mut shutdown => {
228                debug!("context shutdown, stopping engine");
229            },
230            _ = &mut voter_task => {
231                panic!("voter should not finish");
232            },
233            _ = &mut batcher_task => {
234                panic!("batcher should not finish");
235            },
236            _ = &mut resolver_task => {
237                panic!("resolver should not finish");
238            },
239        }
240    }
241}