commonware_consensus/simplex/
engine.rs

1use super::{
2    actors::{resolver, voter},
3    config::Config,
4    Context, View,
5};
6use crate::{Automaton, Committer, Relay, Supervisor};
7use commonware_cryptography::Scheme;
8use commonware_macros::select;
9use commonware_p2p::{Receiver, Sender};
10use commonware_runtime::{Blob, Clock, Handle, Metrics, Spawner, Storage};
11use commonware_storage::journal::variable::Journal;
12use commonware_utils::Array;
13use governor::clock::Clock as GClock;
14use rand::{CryptoRng, Rng};
15use tracing::debug;
16
17/// Instance of `simplex` consensus engine.
18pub struct Engine<
19    B: Blob,
20    E: Clock + GClock + Rng + CryptoRng + Spawner + Storage<B> + Metrics,
21    C: Scheme,
22    D: Array,
23    A: Automaton<Context = Context<D>, Digest = D>,
24    R: Relay<Digest = D>,
25    F: Committer<Digest = D>,
26    S: Supervisor<Index = View, PublicKey = C::PublicKey>,
27> {
28    context: E,
29
30    voter: voter::Actor<B, E, C, D, A, R, F, S>,
31    voter_mailbox: voter::Mailbox<D>,
32    resolver: resolver::Actor<E, C, D, S>,
33    resolver_mailbox: resolver::Mailbox,
34}
35
36impl<
37        B: Blob,
38        E: Clock + GClock + Rng + CryptoRng + Spawner + Storage<B> + Metrics,
39        C: Scheme,
40        D: Array,
41        A: Automaton<Context = Context<D>, Digest = D>,
42        R: Relay<Digest = D>,
43        F: Committer<Digest = D>,
44        S: Supervisor<Index = View, PublicKey = C::PublicKey>,
45    > Engine<B, E, C, D, A, R, F, S>
46{
47    /// Create a new `simplex` consensus engine.
48    pub fn new(context: E, journal: Journal<B, E>, cfg: Config<C, D, A, R, F, S>) -> Self {
49        // Ensure configuration is valid
50        cfg.assert();
51
52        // Create voter
53        let (voter, voter_mailbox) = voter::Actor::new(
54            context.with_label("voter"),
55            journal,
56            voter::Config {
57                crypto: cfg.crypto.clone(),
58                automaton: cfg.automaton,
59                relay: cfg.relay,
60                committer: cfg.committer,
61                supervisor: cfg.supervisor.clone(),
62                mailbox_size: cfg.mailbox_size,
63                namespace: cfg.namespace.clone(),
64                leader_timeout: cfg.leader_timeout,
65                notarization_timeout: cfg.notarization_timeout,
66                nullify_retry: cfg.nullify_retry,
67                activity_timeout: cfg.activity_timeout,
68                replay_concurrency: cfg.replay_concurrency,
69            },
70        );
71
72        // Create resolver
73        let (resolver, resolver_mailbox) = resolver::Actor::new(
74            context.with_label("resolver"),
75            resolver::Config {
76                crypto: cfg.crypto,
77                supervisor: cfg.supervisor,
78                mailbox_size: cfg.mailbox_size,
79                namespace: cfg.namespace,
80                activity_timeout: cfg.activity_timeout,
81                fetch_timeout: cfg.fetch_timeout,
82                fetch_concurrent: cfg.fetch_concurrent,
83                max_fetch_count: cfg.max_fetch_count,
84                max_fetch_size: cfg.max_fetch_size,
85                fetch_rate_per_peer: cfg.fetch_rate_per_peer,
86            },
87        );
88
89        // Return the engine
90        Self {
91            context,
92
93            voter,
94            voter_mailbox,
95            resolver,
96            resolver_mailbox,
97        }
98    }
99
100    /// Start the `simplex` consensus engine.
101    ///
102    /// This will also rebuild the state of the engine from provided `Journal`.
103    pub fn start(
104        self,
105        voter_network: (
106            impl Sender<PublicKey = C::PublicKey>,
107            impl Receiver<PublicKey = C::PublicKey>,
108        ),
109        resolver_network: (
110            impl Sender<PublicKey = C::PublicKey>,
111            impl Receiver<PublicKey = C::PublicKey>,
112        ),
113    ) -> Handle<()> {
114        self.context
115            .clone()
116            .spawn(|_| self.run(voter_network, resolver_network))
117    }
118
119    async fn run(
120        self,
121        voter_network: (
122            impl Sender<PublicKey = C::PublicKey>,
123            impl Receiver<PublicKey = C::PublicKey>,
124        ),
125        resolver_network: (
126            impl Sender<PublicKey = C::PublicKey>,
127            impl Receiver<PublicKey = C::PublicKey>,
128        ),
129    ) {
130        // Start the voter
131        let (voter_sender, voter_receiver) = voter_network;
132        let mut voter_task = self
133            .voter
134            .start(self.resolver_mailbox, voter_sender, voter_receiver);
135
136        // Start the resolver
137        let (resolver_sender, resolver_receiver) = resolver_network;
138        let mut resolver_task =
139            self.resolver
140                .start(self.voter_mailbox, resolver_sender, resolver_receiver);
141
142        // Wait for the resolver or voter to finish
143        select! {
144            _ = &mut voter_task => {
145                debug!("voter finished");
146                resolver_task.abort();
147            },
148            _ = &mut resolver_task => {
149                debug!("resolver finished");
150                voter_task.abort();
151            },
152        }
153    }
154}