Skip to main content

commonware_consensus/simplex/
engine.rs

1use super::{
2    actors::{batcher, resolver, voter},
3    config::Config,
4    elector::Config as Elector,
5    types::{Activity, Context},
6};
7use crate::{
8    simplex::{scheme::Scheme, Plan},
9    CertifiableAutomaton, Relay, Reporter,
10};
11use commonware_cryptography::Digest;
12use commonware_macros::select;
13use commonware_p2p::{Blocker, Receiver, Sender};
14use commonware_parallel::Strategy;
15use commonware_runtime::{
16    spawn_cell, BufferPooler, Clock, ContextCell, Handle, Metrics, Spawner, Storage,
17};
18use rand_core::CryptoRngCore;
19use tracing::debug;
20
21/// Instance of `simplex` consensus engine.
22pub struct Engine<
23    E: BufferPooler + Clock + CryptoRngCore + Spawner + Storage + Metrics,
24    S: Scheme<D>,
25    L: Elector<S>,
26    B: Blocker<PublicKey = S::PublicKey>,
27    D: Digest,
28    A: CertifiableAutomaton<Context = Context<D, S::PublicKey>, Digest = D>,
29    R: Relay<Digest = D, PublicKey = S::PublicKey, Plan = Plan<S::PublicKey>>,
30    F: Reporter<Activity = Activity<S, D>>,
31    T: Strategy,
32> {
33    context: ContextCell<E>,
34
35    voter: voter::Actor<E, S, L, B, D, A, R, F>,
36    voter_mailbox: voter::Mailbox<S, D>,
37
38    batcher: batcher::Actor<E, S, B, D, F, R, T>,
39    batcher_mailbox: batcher::Mailbox<S, D>,
40
41    resolver: resolver::Actor<E, S, B, D, T>,
42    resolver_mailbox: resolver::Mailbox<S, D>,
43}
44
45impl<
46        E: BufferPooler + Clock + CryptoRngCore + Spawner + Storage + Metrics,
47        S: Scheme<D>,
48        L: Elector<S>,
49        B: Blocker<PublicKey = S::PublicKey>,
50        D: Digest,
51        A: CertifiableAutomaton<Context = Context<D, S::PublicKey>, Digest = D>,
52        R: Relay<Digest = D, PublicKey = S::PublicKey, Plan = Plan<S::PublicKey>>,
53        F: Reporter<Activity = Activity<S, D>>,
54        T: Strategy,
55    > Engine<E, S, L, B, D, A, R, F, T>
56{
57    /// Create a new `simplex` consensus engine.
58    pub fn new(mut context: E, cfg: Config<S, L, B, D, A, R, F, T>) -> Self {
59        // Ensure configuration is valid
60        cfg.assert(&mut context);
61
62        // Create batcher
63        let (batcher, batcher_mailbox) = batcher::Actor::new(
64            context.child("batcher"),
65            batcher::Config {
66                scheme: cfg.scheme.clone(),
67                blocker: cfg.blocker.clone(),
68                reporter: cfg.reporter.clone(),
69                relay: cfg.relay.clone(),
70                strategy: cfg.strategy.clone(),
71                epoch: cfg.epoch,
72                mailbox_size: cfg.mailbox_size,
73                activity_timeout: cfg.activity_timeout,
74                skip_timeout: cfg.skip_timeout,
75                forwarding: cfg.forwarding,
76            },
77        );
78
79        // Create voter
80        let (voter, voter_mailbox) = voter::Actor::new(
81            context.child("voter"),
82            voter::Config {
83                scheme: cfg.scheme.clone(),
84                elector: cfg.elector,
85                blocker: cfg.blocker.clone(),
86                automaton: cfg.automaton,
87                relay: cfg.relay,
88                reporter: cfg.reporter,
89                partition: cfg.partition,
90                mailbox_size: cfg.mailbox_size,
91                epoch: cfg.epoch,
92                floor: cfg.floor,
93                leader_timeout: cfg.leader_timeout,
94                certification_timeout: cfg.certification_timeout,
95                timeout_retry: cfg.timeout_retry,
96                activity_timeout: cfg.activity_timeout,
97                replay_buffer: cfg.replay_buffer,
98                write_buffer: cfg.write_buffer,
99                page_cache: cfg.page_cache,
100            },
101        );
102
103        // Create resolver
104        let (resolver, resolver_mailbox) = resolver::Actor::new(
105            context.child("resolver"),
106            resolver::Config {
107                blocker: cfg.blocker,
108                scheme: cfg.scheme,
109                strategy: cfg.strategy,
110                mailbox_size: cfg.mailbox_size,
111                epoch: cfg.epoch,
112                fetch_concurrent: cfg.fetch_concurrent,
113                fetch_timeout: cfg.fetch_timeout,
114            },
115        );
116
117        // Return the engine
118        Self {
119            context: ContextCell::new(context),
120
121            voter,
122            voter_mailbox,
123
124            batcher,
125            batcher_mailbox,
126
127            resolver,
128            resolver_mailbox,
129        }
130    }
131
132    /// Start the `simplex` consensus engine.
133    ///
134    /// This will also rebuild the state of the engine from provided `Journal`.
135    ///
136    /// # Network Channels
137    ///
138    /// The engine requires three separate network channels, each carrying votes or
139    /// certificates to help drive the consensus engine.
140    ///
141    /// ## `vote_network`
142    ///
143    /// Carries **individual votes**:
144    /// - [`Notarize`](super::types::Notarize): Vote to notarize a proposal
145    /// - [`Nullify`](super::types::Nullify): Vote to skip a view
146    /// - [`Finalize`](super::types::Finalize): Vote to finalize a notarized proposal
147    ///
148    /// These messages are sent to the batcher, which performs batch signature
149    /// verification before forwarding valid votes to the voter for aggregation.
150    ///
151    /// ## `certificate_network`
152    ///
153    /// Carries **certificates**:
154    /// - [`Notarization`](super::types::Notarization): Proof that a proposal was notarized
155    /// - [`Nullification`](super::types::Nullification): Proof that a view was skipped
156    /// - [`Finalization`](super::types::Finalization): Proof that a proposal was finalized
157    ///
158    /// Certificates are broadcast on this channel as soon as they are constructed
159    /// from collected votes. We separate this from the `vote_network` to optimistically
160    /// allow for certificate processing to short-circuit vote processing (if we receive
161    /// a certificate before processing pending votes, we can skip them).
162    ///
163    /// ## `resolver_network`
164    ///
165    /// Used for request-response certificate fetching. When a node needs to
166    /// catch up on a view it missed (e.g., to verify a proposal's parent), it
167    /// uses this channel to request certificates from peers. The resolver handles
168    /// rate limiting, retries, and peer selection for these requests.
169    pub fn start(
170        mut self,
171        vote_network: (
172            impl Sender<PublicKey = S::PublicKey>,
173            impl Receiver<PublicKey = S::PublicKey>,
174        ),
175        certificate_network: (
176            impl Sender<PublicKey = S::PublicKey>,
177            impl Receiver<PublicKey = S::PublicKey>,
178        ),
179        resolver_network: (
180            impl Sender<PublicKey = S::PublicKey>,
181            impl Receiver<PublicKey = S::PublicKey>,
182        ),
183    ) -> Handle<()> {
184        spawn_cell!(
185            self.context,
186            self.run(vote_network, certificate_network, resolver_network)
187        )
188    }
189
190    async fn run(
191        self,
192        vote_network: (
193            impl Sender<PublicKey = S::PublicKey>,
194            impl Receiver<PublicKey = S::PublicKey>,
195        ),
196        certificate_network: (
197            impl Sender<PublicKey = S::PublicKey>,
198            impl Receiver<PublicKey = S::PublicKey>,
199        ),
200        resolver_network: (
201            impl Sender<PublicKey = S::PublicKey>,
202            impl Receiver<PublicKey = S::PublicKey>,
203        ),
204    ) {
205        // Start the batcher (receives votes via vote_network, certificates via certificate_network)
206        // Batcher sends proposals/certificates to voter via voter_mailbox
207        let (vote_sender, vote_receiver) = vote_network;
208        let (certificate_sender, certificate_receiver) = certificate_network;
209        let mut batcher_task = self.batcher.start(
210            self.voter_mailbox.clone(),
211            vote_receiver,
212            certificate_receiver,
213        );
214
215        // Start the resolver (sends certificates to voter via voter_mailbox)
216        let (resolver_sender, resolver_receiver) = resolver_network;
217        let mut resolver_task =
218            self.resolver
219                .start(self.voter_mailbox, resolver_sender, resolver_receiver);
220
221        // Start the voter
222        let mut voter_task = self.voter.start(
223            self.batcher_mailbox,
224            self.resolver_mailbox,
225            vote_sender,
226            certificate_sender,
227        );
228
229        // If any task completes, the engine should stop
230        let mut shutdown = self.context.stopped();
231        select! {
232            _ = &mut shutdown => {
233                debug!("context shutdown, stopping engine");
234            },
235            voter = &mut voter_task => {
236                debug!(?voter, "voter stopped, shutting down engine");
237            },
238            batcher = &mut batcher_task => {
239                debug!(?batcher, "batcher stopped, shutting down engine");
240            },
241            resolver = &mut resolver_task => {
242                debug!(?resolver, "resolver stopped, shutting down engine");
243            },
244        }
245    }
246}