Skip to main content

commonware_consensus/simplex/
engine.rs

1use super::{
2    actors::{batcher, resolver, voter},
3    config::Config,
4    elector::Config as Elector,
5    types::{Activity, Context},
6};
7use crate::{
8    simplex::{scheme::Scheme, Plan},
9    CertifiableAutomaton, Relay, Reporter,
10};
11use commonware_cryptography::Digest;
12use commonware_macros::select;
13use commonware_p2p::{Blocker, Receiver, Sender};
14use commonware_parallel::Strategy;
15use commonware_runtime::{
16    spawn_cell, BufferPooler, Clock, ContextCell, Handle, Metrics, Spawner, Storage,
17};
18use rand_core::CryptoRngCore;
19use tracing::debug;
20
21/// Instance of `simplex` consensus engine.
22pub struct Engine<
23    E: BufferPooler + Clock + CryptoRngCore + Spawner + Storage + Metrics,
24    S: Scheme<D>,
25    L: Elector<S>,
26    B: Blocker<PublicKey = S::PublicKey>,
27    D: Digest,
28    A: CertifiableAutomaton<Context = Context<D, S::PublicKey>, Digest = D>,
29    R: Relay<Digest = D, PublicKey = S::PublicKey, Plan = Plan<S::PublicKey>>,
30    F: Reporter<Activity = Activity<S, D>>,
31    T: Strategy,
32> {
33    context: ContextCell<E>,
34
35    voter: voter::Actor<E, S, L, B, D, A, R, F>,
36    voter_mailbox: voter::Mailbox<S, D>,
37
38    batcher: batcher::Actor<E, S, B, D, F, R, T>,
39    batcher_mailbox: batcher::Mailbox<S, D>,
40
41    resolver: resolver::Actor<E, S, B, D, T>,
42    resolver_mailbox: resolver::Mailbox<S, D>,
43}
44
45impl<
46        E: BufferPooler + Clock + CryptoRngCore + Spawner + Storage + Metrics,
47        S: Scheme<D>,
48        L: Elector<S>,
49        B: Blocker<PublicKey = S::PublicKey>,
50        D: Digest,
51        A: CertifiableAutomaton<Context = Context<D, S::PublicKey>, Digest = D>,
52        R: Relay<Digest = D, PublicKey = S::PublicKey, Plan = Plan<S::PublicKey>>,
53        F: Reporter<Activity = Activity<S, D>>,
54        T: Strategy,
55    > Engine<E, S, L, B, D, A, R, F, T>
56{
57    /// Create a new `simplex` consensus engine.
58    pub fn new(context: E, cfg: Config<S, L, B, D, A, R, F, T>) -> Self {
59        // Ensure configuration is valid
60        cfg.assert();
61
62        // Create batcher
63        let (batcher, batcher_mailbox) = batcher::Actor::new(
64            context.with_label("batcher"),
65            batcher::Config {
66                scheme: cfg.scheme.clone(),
67                blocker: cfg.blocker.clone(),
68                reporter: cfg.reporter.clone(),
69                relay: cfg.relay.clone(),
70                strategy: cfg.strategy.clone(),
71                epoch: cfg.epoch,
72                mailbox_size: cfg.mailbox_size,
73                activity_timeout: cfg.activity_timeout,
74                skip_timeout: cfg.skip_timeout,
75                forwarding: cfg.forwarding,
76            },
77        );
78
79        // Create voter
80        let (voter, voter_mailbox) = voter::Actor::new(
81            context.with_label("voter"),
82            voter::Config {
83                scheme: cfg.scheme.clone(),
84                elector: cfg.elector,
85                blocker: cfg.blocker.clone(),
86                automaton: cfg.automaton,
87                relay: cfg.relay,
88                reporter: cfg.reporter,
89                partition: cfg.partition,
90                mailbox_size: cfg.mailbox_size,
91                epoch: cfg.epoch,
92                leader_timeout: cfg.leader_timeout,
93                certification_timeout: cfg.certification_timeout,
94                timeout_retry: cfg.timeout_retry,
95                activity_timeout: cfg.activity_timeout,
96                replay_buffer: cfg.replay_buffer,
97                write_buffer: cfg.write_buffer,
98                page_cache: cfg.page_cache,
99            },
100        );
101
102        // Create resolver
103        let (resolver, resolver_mailbox) = resolver::Actor::new(
104            context.with_label("resolver"),
105            resolver::Config {
106                blocker: cfg.blocker,
107                scheme: cfg.scheme,
108                strategy: cfg.strategy,
109                mailbox_size: cfg.mailbox_size,
110                epoch: cfg.epoch,
111                fetch_concurrent: cfg.fetch_concurrent,
112                fetch_timeout: cfg.fetch_timeout,
113            },
114        );
115
116        // Return the engine
117        Self {
118            context: ContextCell::new(context),
119
120            voter,
121            voter_mailbox,
122
123            batcher,
124            batcher_mailbox,
125
126            resolver,
127            resolver_mailbox,
128        }
129    }
130
131    /// Start the `simplex` consensus engine.
132    ///
133    /// This will also rebuild the state of the engine from provided `Journal`.
134    ///
135    /// # Network Channels
136    ///
137    /// The engine requires three separate network channels, each carrying votes or
138    /// certificates to help drive the consensus engine.
139    ///
140    /// ## `vote_network`
141    ///
142    /// Carries **individual votes**:
143    /// - [`Notarize`](super::types::Notarize): Vote to notarize a proposal
144    /// - [`Nullify`](super::types::Nullify): Vote to skip a view
145    /// - [`Finalize`](super::types::Finalize): Vote to finalize a notarized proposal
146    ///
147    /// These messages are sent to the batcher, which performs batch signature
148    /// verification before forwarding valid votes to the voter for aggregation.
149    ///
150    /// ## `certificate_network`
151    ///
152    /// Carries **certificates**:
153    /// - [`Notarization`](super::types::Notarization): Proof that a proposal was notarized
154    /// - [`Nullification`](super::types::Nullification): Proof that a view was skipped
155    /// - [`Finalization`](super::types::Finalization): Proof that a proposal was finalized
156    ///
157    /// Certificates are broadcast on this channel as soon as they are constructed
158    /// from collected votes. We separate this from the `vote_network` to optimistically
159    /// allow for certificate processing to short-circuit vote processing (if we receive
160    /// a certificate before processing pending votes, we can skip them).
161    ///
162    /// ## `resolver_network`
163    ///
164    /// Used for request-response certificate fetching. When a node needs to
165    /// catch up on a view it missed (e.g., to verify a proposal's parent), it
166    /// uses this channel to request certificates from peers. The resolver handles
167    /// rate limiting, retries, and peer selection for these requests.
168    pub fn start(
169        mut self,
170        vote_network: (
171            impl Sender<PublicKey = S::PublicKey>,
172            impl Receiver<PublicKey = S::PublicKey>,
173        ),
174        certificate_network: (
175            impl Sender<PublicKey = S::PublicKey>,
176            impl Receiver<PublicKey = S::PublicKey>,
177        ),
178        resolver_network: (
179            impl Sender<PublicKey = S::PublicKey>,
180            impl Receiver<PublicKey = S::PublicKey>,
181        ),
182    ) -> Handle<()> {
183        spawn_cell!(
184            self.context,
185            self.run(vote_network, certificate_network, resolver_network)
186                .await
187        )
188    }
189
190    async fn run(
191        self,
192        vote_network: (
193            impl Sender<PublicKey = S::PublicKey>,
194            impl Receiver<PublicKey = S::PublicKey>,
195        ),
196        certificate_network: (
197            impl Sender<PublicKey = S::PublicKey>,
198            impl Receiver<PublicKey = S::PublicKey>,
199        ),
200        resolver_network: (
201            impl Sender<PublicKey = S::PublicKey>,
202            impl Receiver<PublicKey = S::PublicKey>,
203        ),
204    ) {
205        // Start the batcher (receives votes via vote_network, certificates via certificate_network)
206        // Batcher sends proposals/certificates to voter via voter_mailbox
207        let (vote_sender, vote_receiver) = vote_network;
208        let (certificate_sender, certificate_receiver) = certificate_network;
209        let mut batcher_task = self.batcher.start(
210            self.voter_mailbox.clone(),
211            vote_receiver,
212            certificate_receiver,
213        );
214
215        // Start the resolver (sends certificates to voter via voter_mailbox)
216        let (resolver_sender, resolver_receiver) = resolver_network;
217        let mut resolver_task =
218            self.resolver
219                .start(self.voter_mailbox, resolver_sender, resolver_receiver);
220
221        // Start the voter
222        let mut voter_task = self.voter.start(
223            self.batcher_mailbox,
224            self.resolver_mailbox,
225            vote_sender,
226            certificate_sender,
227        );
228
229        // Wait for the resolver or voter to finish
230        let mut shutdown = self.context.stopped();
231        select! {
232            _ = &mut shutdown => {
233                debug!("context shutdown, stopping engine");
234            },
235            _ = &mut voter_task => {
236                panic!("voter should not finish");
237            },
238            _ = &mut batcher_task => {
239                panic!("batcher should not finish");
240            },
241            _ = &mut resolver_task => {
242                panic!("resolver should not finish");
243            },
244        }
245    }
246}