commonware_consensus/simplex/
engine.rs

1use super::{
2    actors::{batcher, resolver, voter},
3    config::Config,
4    types::{Activity, Context},
5};
6use crate::{simplex::signing_scheme::Scheme, Automaton, Relay, Reporter};
7use commonware_cryptography::{Digest, PublicKey};
8use commonware_macros::select;
9use commonware_p2p::{Blocker, Receiver, Sender};
10use commonware_runtime::{spawn_cell, Clock, ContextCell, Handle, Metrics, Spawner, Storage};
11use governor::clock::Clock as GClock;
12use rand::{CryptoRng, Rng};
13use tracing::debug;
14
15/// Instance of `simplex` consensus engine.
16pub struct Engine<
17    E: Clock + GClock + Rng + CryptoRng + Spawner + Storage + Metrics,
18    P: PublicKey,
19    S: Scheme<PublicKey = P>,
20    B: Blocker<PublicKey = P>,
21    D: Digest,
22    A: Automaton<Context = Context<D, P>, Digest = D>,
23    R: Relay<Digest = D>,
24    F: Reporter<Activity = Activity<S, D>>,
25> {
26    context: ContextCell<E>,
27
28    voter: voter::Actor<E, P, S, B, D, A, R, F>,
29    voter_mailbox: voter::Mailbox<S, D>,
30
31    batcher: batcher::Actor<E, P, S, B, D, F>,
32    batcher_mailbox: batcher::Mailbox<S, D>,
33
34    resolver: resolver::Actor<E, P, S, B, D>,
35    resolver_mailbox: resolver::Mailbox<S, D>,
36}
37
38impl<
39        E: Clock + GClock + Rng + CryptoRng + Spawner + Storage + Metrics,
40        P: PublicKey,
41        S: Scheme<PublicKey = P>,
42        B: Blocker<PublicKey = P>,
43        D: Digest,
44        A: Automaton<Context = Context<D, P>, Digest = D>,
45        R: Relay<Digest = D>,
46        F: Reporter<Activity = Activity<S, D>>,
47    > Engine<E, P, S, B, D, A, R, F>
48{
49    /// Create a new `simplex` consensus engine.
50    pub fn new(context: E, cfg: Config<P, S, B, D, A, R, F>) -> Self {
51        // Ensure configuration is valid
52        cfg.assert();
53
54        // Create batcher
55        let (batcher, batcher_mailbox) = batcher::Actor::new(
56            context.with_label("batcher"),
57            batcher::Config {
58                scheme: cfg.scheme.clone(),
59                blocker: cfg.blocker.clone(),
60                reporter: cfg.reporter.clone(),
61                epoch: cfg.epoch,
62                namespace: cfg.namespace.clone(),
63                mailbox_size: cfg.mailbox_size,
64                activity_timeout: cfg.activity_timeout,
65                skip_timeout: cfg.skip_timeout,
66            },
67        );
68
69        // Create voter
70        let (voter, voter_mailbox) = voter::Actor::new(
71            context.with_label("voter"),
72            voter::Config {
73                scheme: cfg.scheme.clone(),
74                blocker: cfg.blocker.clone(),
75                automaton: cfg.automaton,
76                relay: cfg.relay,
77                reporter: cfg.reporter,
78                partition: cfg.partition,
79                mailbox_size: cfg.mailbox_size,
80                epoch: cfg.epoch,
81                namespace: cfg.namespace.clone(),
82                leader_timeout: cfg.leader_timeout,
83                notarization_timeout: cfg.notarization_timeout,
84                nullify_retry: cfg.nullify_retry,
85                activity_timeout: cfg.activity_timeout,
86                replay_buffer: cfg.replay_buffer,
87                write_buffer: cfg.write_buffer,
88                buffer_pool: cfg.buffer_pool,
89            },
90        );
91
92        // Create resolver
93        let (resolver, resolver_mailbox) = resolver::Actor::new(
94            context.with_label("resolver"),
95            resolver::Config {
96                blocker: cfg.blocker,
97                scheme: cfg.scheme,
98                mailbox_size: cfg.mailbox_size,
99                epoch: cfg.epoch,
100                namespace: cfg.namespace,
101                activity_timeout: cfg.activity_timeout,
102                fetch_timeout: cfg.fetch_timeout,
103                fetch_concurrent: cfg.fetch_concurrent,
104                max_fetch_count: cfg.max_fetch_count,
105                fetch_rate_per_peer: cfg.fetch_rate_per_peer,
106            },
107        );
108
109        // Return the engine
110        Self {
111            context: ContextCell::new(context),
112
113            voter,
114            voter_mailbox,
115
116            batcher,
117            batcher_mailbox,
118
119            resolver,
120            resolver_mailbox,
121        }
122    }
123
124    /// Start the `simplex` consensus engine.
125    ///
126    /// This will also rebuild the state of the engine from provided `Journal`.
127    pub fn start(
128        mut self,
129        pending_network: (impl Sender<PublicKey = P>, impl Receiver<PublicKey = P>),
130        recovered_network: (impl Sender<PublicKey = P>, impl Receiver<PublicKey = P>),
131        resolver_network: (impl Sender<PublicKey = P>, impl Receiver<PublicKey = P>),
132    ) -> Handle<()> {
133        spawn_cell!(
134            self.context,
135            self.run(pending_network, recovered_network, resolver_network)
136                .await
137        )
138    }
139
140    async fn run(
141        self,
142        pending_network: (impl Sender<PublicKey = P>, impl Receiver<PublicKey = P>),
143        recovered_network: (impl Sender<PublicKey = P>, impl Receiver<PublicKey = P>),
144        resolver_network: (impl Sender<PublicKey = P>, impl Receiver<PublicKey = P>),
145    ) {
146        // Start the batcher
147        let (pending_sender, pending_receiver) = pending_network;
148        let mut batcher_task = self
149            .batcher
150            .start(self.voter_mailbox.clone(), pending_receiver);
151
152        // Start the resolver
153        let (resolver_sender, resolver_receiver) = resolver_network;
154        let mut resolver_task =
155            self.resolver
156                .start(self.voter_mailbox, resolver_sender, resolver_receiver);
157
158        // Start the voter
159        let (recovered_sender, recovered_receiver) = recovered_network;
160        let mut voter_task = self.voter.start(
161            self.batcher_mailbox,
162            self.resolver_mailbox,
163            pending_sender,
164            recovered_sender,
165            recovered_receiver,
166        );
167
168        // Wait for the resolver or voter to finish
169        let mut shutdown = self.context.stopped();
170        select! {
171            _ = &mut shutdown => {
172                debug!("shutdown");
173            },
174            _ = &mut voter_task => {
175                panic!("voter should not finish");
176            },
177            _ = &mut batcher_task => {
178                panic!("batcher should not finish");
179            },
180            _ = &mut resolver_task => {
181                panic!("resolver should not finish");
182            },
183        }
184    }
185}