commonware_consensus/simplex/
engine.rs

1use super::{
2    actors::{resolver, voter},
3    config::Config,
4    types::{Activity, Context, View},
5};
6use crate::{Automaton, Relay, Reporter, Supervisor};
7use commonware_cryptography::{Digest, Signer};
8use commonware_macros::select;
9use commonware_p2p::{Receiver, Sender};
10use commonware_runtime::{Clock, Handle, Metrics, Spawner, Storage};
11use governor::clock::Clock as GClock;
12use rand::{CryptoRng, Rng};
13use tracing::debug;
14
15/// Instance of `simplex` consensus engine.
16pub struct Engine<
17    E: Clock + GClock + Rng + CryptoRng + Spawner + Storage + Metrics,
18    C: Signer,
19    D: Digest,
20    A: Automaton<Context = Context<D>, Digest = D>,
21    R: Relay<Digest = D>,
22    F: Reporter<Activity = Activity<C::Signature, D>>,
23    S: Supervisor<Index = View, PublicKey = C::PublicKey>,
24> {
25    context: E,
26
27    voter: voter::Actor<E, C, D, A, R, F, S>,
28    voter_mailbox: voter::Mailbox<C::Signature, D>,
29    resolver: resolver::Actor<E, C::PublicKey, D, S>,
30    resolver_mailbox: resolver::Mailbox<C::Signature, D>,
31}
32
33impl<
34        E: Clock + GClock + Rng + CryptoRng + Spawner + Storage + Metrics,
35        C: Signer,
36        D: Digest,
37        A: Automaton<Context = Context<D>, Digest = D>,
38        R: Relay<Digest = D>,
39        F: Reporter<Activity = Activity<C::Signature, D>>,
40        S: Supervisor<Index = View, PublicKey = C::PublicKey>,
41    > Engine<E, C, D, A, R, F, S>
42{
43    /// Create a new `simplex` consensus engine.
44    pub fn new(context: E, cfg: Config<C, D, A, R, F, S>) -> Self {
45        // Ensure configuration is valid
46        cfg.assert();
47
48        // Create voter
49        let public_key = cfg.crypto.public_key();
50        let (voter, voter_mailbox) = voter::Actor::new(
51            context.with_label("voter"),
52            voter::Config {
53                crypto: cfg.crypto,
54                automaton: cfg.automaton,
55                relay: cfg.relay,
56                reporter: cfg.reporter,
57                supervisor: cfg.supervisor.clone(),
58                partition: cfg.partition,
59                mailbox_size: cfg.mailbox_size,
60                namespace: cfg.namespace.clone(),
61                max_participants: cfg.max_participants,
62                leader_timeout: cfg.leader_timeout,
63                notarization_timeout: cfg.notarization_timeout,
64                nullify_retry: cfg.nullify_retry,
65                activity_timeout: cfg.activity_timeout,
66                skip_timeout: cfg.skip_timeout,
67                replay_buffer: cfg.replay_buffer,
68                write_buffer: cfg.write_buffer,
69                buffer_pool: cfg.buffer_pool,
70            },
71        );
72
73        // Create resolver
74        let (resolver, resolver_mailbox) = resolver::Actor::new(
75            context.with_label("resolver"),
76            resolver::Config {
77                crypto: public_key,
78                supervisor: cfg.supervisor,
79                mailbox_size: cfg.mailbox_size,
80                namespace: cfg.namespace,
81                max_participants: cfg.max_participants,
82                activity_timeout: cfg.activity_timeout,
83                fetch_timeout: cfg.fetch_timeout,
84                fetch_concurrent: cfg.fetch_concurrent,
85                max_fetch_count: cfg.max_fetch_count,
86                fetch_rate_per_peer: cfg.fetch_rate_per_peer,
87            },
88        );
89
90        // Return the engine
91        Self {
92            context,
93
94            voter,
95            voter_mailbox,
96            resolver,
97            resolver_mailbox,
98        }
99    }
100
101    /// Start the `simplex` consensus engine.
102    ///
103    /// This will also rebuild the state of the engine from provided `Journal`.
104    pub fn start(
105        self,
106        voter_network: (
107            impl Sender<PublicKey = C::PublicKey>,
108            impl Receiver<PublicKey = C::PublicKey>,
109        ),
110        resolver_network: (
111            impl Sender<PublicKey = C::PublicKey>,
112            impl Receiver<PublicKey = C::PublicKey>,
113        ),
114    ) -> Handle<()> {
115        self.context
116            .clone()
117            .spawn(|_| self.run(voter_network, resolver_network))
118    }
119
120    async fn run(
121        self,
122        voter_network: (
123            impl Sender<PublicKey = C::PublicKey>,
124            impl Receiver<PublicKey = C::PublicKey>,
125        ),
126        resolver_network: (
127            impl Sender<PublicKey = C::PublicKey>,
128            impl Receiver<PublicKey = C::PublicKey>,
129        ),
130    ) {
131        // Start the voter
132        let (voter_sender, voter_receiver) = voter_network;
133        let mut voter_task = self
134            .voter
135            .start(self.resolver_mailbox, voter_sender, voter_receiver);
136
137        // Start the resolver
138        let (resolver_sender, resolver_receiver) = resolver_network;
139        let mut resolver_task =
140            self.resolver
141                .start(self.voter_mailbox, resolver_sender, resolver_receiver);
142
143        // Wait for the resolver or voter to finish
144        select! {
145            _ = &mut voter_task => {
146                debug!("voter finished");
147                resolver_task.abort();
148            },
149            _ = &mut resolver_task => {
150                debug!("resolver finished");
151                voter_task.abort();
152            },
153        }
154    }
155}