commonware_consensus/simplex/
engine.rs

1use super::{
2    actors::{batcher, resolver, voter},
3    config::Config,
4    elector::Config as Elector,
5    types::{Activity, Context},
6};
7use crate::{simplex::scheme::Scheme, CertifiableAutomaton, Relay, Reporter};
8use commonware_cryptography::Digest;
9use commonware_macros::select;
10use commonware_p2p::{Blocker, Receiver, Sender};
11use commonware_runtime::{spawn_cell, Clock, ContextCell, Handle, Metrics, Spawner, Storage};
12use rand::{CryptoRng, Rng};
13use tracing::debug;
14
15/// Instance of `simplex` consensus engine.
16pub struct Engine<
17    E: Clock + Rng + CryptoRng + Spawner + Storage + Metrics,
18    S: Scheme<D>,
19    L: Elector<S>,
20    B: Blocker<PublicKey = S::PublicKey>,
21    D: Digest,
22    A: CertifiableAutomaton<Context = Context<D, S::PublicKey>, Digest = D>,
23    R: Relay<Digest = D>,
24    F: Reporter<Activity = Activity<S, D>>,
25> {
26    context: ContextCell<E>,
27
28    voter: voter::Actor<E, S, L, B, D, A, R, F>,
29    voter_mailbox: voter::Mailbox<S, D>,
30
31    batcher: batcher::Actor<E, S, B, D, F>,
32    batcher_mailbox: batcher::Mailbox<S, D>,
33
34    resolver: resolver::Actor<E, S, B, D>,
35    resolver_mailbox: resolver::Mailbox<S, D>,
36}
37
38impl<
39        E: Clock + Rng + CryptoRng + Spawner + Storage + Metrics,
40        S: Scheme<D>,
41        L: Elector<S>,
42        B: Blocker<PublicKey = S::PublicKey>,
43        D: Digest,
44        A: CertifiableAutomaton<Context = Context<D, S::PublicKey>, Digest = D>,
45        R: Relay<Digest = D>,
46        F: Reporter<Activity = Activity<S, D>>,
47    > Engine<E, S, L, B, D, A, R, F>
48{
49    /// Create a new `simplex` consensus engine.
50    pub fn new(context: E, cfg: Config<S, L, B, D, A, R, F>) -> Self {
51        // Ensure configuration is valid
52        cfg.assert();
53
54        // Create batcher
55        let (batcher, batcher_mailbox) = batcher::Actor::new(
56            context.with_label("batcher"),
57            batcher::Config {
58                scheme: cfg.scheme.clone(),
59                blocker: cfg.blocker.clone(),
60                reporter: cfg.reporter.clone(),
61                epoch: cfg.epoch,
62                namespace: cfg.namespace.clone(),
63                mailbox_size: cfg.mailbox_size,
64                activity_timeout: cfg.activity_timeout,
65                skip_timeout: cfg.skip_timeout,
66            },
67        );
68
69        // Create voter
70        let (voter, voter_mailbox) = voter::Actor::new(
71            context.with_label("voter"),
72            voter::Config {
73                scheme: cfg.scheme.clone(),
74                elector: cfg.elector,
75                blocker: cfg.blocker.clone(),
76                automaton: cfg.automaton,
77                relay: cfg.relay,
78                reporter: cfg.reporter,
79                partition: cfg.partition,
80                mailbox_size: cfg.mailbox_size,
81                epoch: cfg.epoch,
82                namespace: cfg.namespace.clone(),
83                leader_timeout: cfg.leader_timeout,
84                notarization_timeout: cfg.notarization_timeout,
85                nullify_retry: cfg.nullify_retry,
86                activity_timeout: cfg.activity_timeout,
87                replay_buffer: cfg.replay_buffer,
88                write_buffer: cfg.write_buffer,
89                buffer_pool: cfg.buffer_pool,
90            },
91        );
92
93        // Create resolver
94        let (resolver, resolver_mailbox) = resolver::Actor::new(
95            context.with_label("resolver"),
96            resolver::Config {
97                blocker: cfg.blocker,
98                scheme: cfg.scheme,
99                mailbox_size: cfg.mailbox_size,
100                epoch: cfg.epoch,
101                namespace: cfg.namespace,
102                fetch_concurrent: cfg.fetch_concurrent,
103                fetch_timeout: cfg.fetch_timeout,
104            },
105        );
106
107        // Return the engine
108        Self {
109            context: ContextCell::new(context),
110
111            voter,
112            voter_mailbox,
113
114            batcher,
115            batcher_mailbox,
116
117            resolver,
118            resolver_mailbox,
119        }
120    }
121
122    /// Start the `simplex` consensus engine.
123    ///
124    /// This will also rebuild the state of the engine from provided `Journal`.
125    ///
126    /// # Network Channels
127    ///
128    /// The engine requires three separate network channels, each carrying votes or
129    /// certificates to help drive the consensus engine.
130    ///
131    /// ## `vote_network`
132    ///
133    /// Carries **individual votes**:
134    /// - [`Notarize`](super::types::Notarize): Vote to notarize a proposal
135    /// - [`Nullify`](super::types::Nullify): Vote to skip a view
136    /// - [`Finalize`](super::types::Finalize): Vote to finalize a notarized proposal
137    ///
138    /// These messages are sent to the batcher, which performs batch signature
139    /// verification before forwarding valid votes to the voter for aggregation.
140    ///
141    /// ## `certificate_network`
142    ///
143    /// Carries **certificates**:
144    /// - [`Notarization`](super::types::Notarization): Proof that a proposal was notarized
145    /// - [`Nullification`](super::types::Nullification): Proof that a view was skipped
146    /// - [`Finalization`](super::types::Finalization): Proof that a proposal was finalized
147    ///
148    /// Certificates are broadcast on this channel as soon as they are constructed
149    /// from collected votes. We separate this from the `vote_network` to optimistically
150    /// allow for certificate processing to short-circuit vote processing (if we receive
151    /// a certificate before processing pending votes, we can skip them).
152    ///
153    /// ## `resolver_network`
154    ///
155    /// Used for request-response certificate fetching. When a node needs to
156    /// catch up on a view it missed (e.g., to verify a proposal's parent), it
157    /// uses this channel to request certificates from peers. The resolver handles
158    /// rate limiting, retries, and peer selection for these requests.
159    pub fn start(
160        mut self,
161        vote_network: (
162            impl Sender<PublicKey = S::PublicKey>,
163            impl Receiver<PublicKey = S::PublicKey>,
164        ),
165        certificate_network: (
166            impl Sender<PublicKey = S::PublicKey>,
167            impl Receiver<PublicKey = S::PublicKey>,
168        ),
169        resolver_network: (
170            impl Sender<PublicKey = S::PublicKey>,
171            impl Receiver<PublicKey = S::PublicKey>,
172        ),
173    ) -> Handle<()> {
174        spawn_cell!(
175            self.context,
176            self.run(vote_network, certificate_network, resolver_network)
177                .await
178        )
179    }
180
181    async fn run(
182        self,
183        vote_network: (
184            impl Sender<PublicKey = S::PublicKey>,
185            impl Receiver<PublicKey = S::PublicKey>,
186        ),
187        certificate_network: (
188            impl Sender<PublicKey = S::PublicKey>,
189            impl Receiver<PublicKey = S::PublicKey>,
190        ),
191        resolver_network: (
192            impl Sender<PublicKey = S::PublicKey>,
193            impl Receiver<PublicKey = S::PublicKey>,
194        ),
195    ) {
196        // Start the batcher (receives votes via vote_network, certificates via certificate_network)
197        // Batcher sends proposals/certificates to voter via voter_mailbox
198        let (vote_sender, vote_receiver) = vote_network;
199        let (certificate_sender, certificate_receiver) = certificate_network;
200        let mut batcher_task = self.batcher.start(
201            self.voter_mailbox.clone(),
202            vote_receiver,
203            certificate_receiver,
204        );
205
206        // Start the resolver (sends certificates to voter via voter_mailbox)
207        let (resolver_sender, resolver_receiver) = resolver_network;
208        let mut resolver_task =
209            self.resolver
210                .start(self.voter_mailbox, resolver_sender, resolver_receiver);
211
212        // Start the voter
213        let mut voter_task = self.voter.start(
214            self.batcher_mailbox,
215            self.resolver_mailbox,
216            vote_sender,
217            certificate_sender,
218        );
219
220        // Wait for the resolver or voter to finish
221        let mut shutdown = self.context.stopped();
222        select! {
223            _ = &mut shutdown => {
224                debug!("context shutdown, stopping engine");
225            },
226            _ = &mut voter_task => {
227                panic!("voter should not finish");
228            },
229            _ = &mut batcher_task => {
230                panic!("batcher should not finish");
231            },
232            _ = &mut resolver_task => {
233                panic!("resolver should not finish");
234            },
235        }
236    }
237}