commonware_consensus/simplex/
engine.rs

1use super::{
2    actors::{resolver, voter},
3    config::Config,
4    types::{Activity, Context, View},
5};
6use crate::{Automaton, Relay, Reporter, Supervisor};
7use commonware_cryptography::{Digest, Signer};
8use commonware_macros::select;
9use commonware_p2p::{Receiver, Sender};
10use commonware_runtime::{Clock, Handle, Metrics, Spawner, Storage};
11use governor::clock::Clock as GClock;
12use rand::{CryptoRng, Rng};
13use tracing::debug;
14
15/// Instance of `simplex` consensus engine.
16pub struct Engine<
17    E: Clock + GClock + Rng + CryptoRng + Spawner + Storage + Metrics,
18    C: Signer,
19    D: Digest,
20    A: Automaton<Context = Context<D>, Digest = D>,
21    R: Relay<Digest = D>,
22    F: Reporter<Activity = Activity<C::Signature, D>>,
23    S: Supervisor<Index = View, PublicKey = C::PublicKey>,
24> {
25    context: E,
26
27    voter: voter::Actor<E, C, D, A, R, F, S>,
28    voter_mailbox: voter::Mailbox<C::Signature, D>,
29    resolver: resolver::Actor<E, C::PublicKey, D, S>,
30    resolver_mailbox: resolver::Mailbox<C::Signature, D>,
31}
32
33impl<
34        E: Clock + GClock + Rng + CryptoRng + Spawner + Storage + Metrics,
35        C: Signer,
36        D: Digest,
37        A: Automaton<Context = Context<D>, Digest = D>,
38        R: Relay<Digest = D>,
39        F: Reporter<Activity = Activity<C::Signature, D>>,
40        S: Supervisor<Index = View, PublicKey = C::PublicKey>,
41    > Engine<E, C, D, A, R, F, S>
42{
43    /// Create a new `simplex` consensus engine.
44    pub fn new(context: E, cfg: Config<C, D, A, R, F, S>) -> Self {
45        // Ensure configuration is valid
46        cfg.assert();
47
48        // Create voter
49        let public_key = cfg.crypto.public_key();
50        let (voter, voter_mailbox) = voter::Actor::new(
51            context.with_label("voter"),
52            voter::Config {
53                crypto: cfg.crypto,
54                automaton: cfg.automaton,
55                relay: cfg.relay,
56                reporter: cfg.reporter,
57                supervisor: cfg.supervisor.clone(),
58                partition: cfg.partition,
59                compression: cfg.compression,
60                mailbox_size: cfg.mailbox_size,
61                namespace: cfg.namespace.clone(),
62                max_participants: cfg.max_participants,
63                leader_timeout: cfg.leader_timeout,
64                notarization_timeout: cfg.notarization_timeout,
65                nullify_retry: cfg.nullify_retry,
66                activity_timeout: cfg.activity_timeout,
67                skip_timeout: cfg.skip_timeout,
68                replay_buffer: cfg.replay_buffer,
69                write_buffer: cfg.write_buffer,
70                buffer_pool: cfg.buffer_pool,
71            },
72        );
73
74        // Create resolver
75        let (resolver, resolver_mailbox) = resolver::Actor::new(
76            context.with_label("resolver"),
77            resolver::Config {
78                crypto: public_key,
79                supervisor: cfg.supervisor,
80                mailbox_size: cfg.mailbox_size,
81                namespace: cfg.namespace,
82                max_participants: cfg.max_participants,
83                activity_timeout: cfg.activity_timeout,
84                fetch_timeout: cfg.fetch_timeout,
85                fetch_concurrent: cfg.fetch_concurrent,
86                max_fetch_count: cfg.max_fetch_count,
87                fetch_rate_per_peer: cfg.fetch_rate_per_peer,
88            },
89        );
90
91        // Return the engine
92        Self {
93            context,
94
95            voter,
96            voter_mailbox,
97            resolver,
98            resolver_mailbox,
99        }
100    }
101
102    /// Start the `simplex` consensus engine.
103    ///
104    /// This will also rebuild the state of the engine from provided `Journal`.
105    pub fn start(
106        self,
107        voter_network: (
108            impl Sender<PublicKey = C::PublicKey>,
109            impl Receiver<PublicKey = C::PublicKey>,
110        ),
111        resolver_network: (
112            impl Sender<PublicKey = C::PublicKey>,
113            impl Receiver<PublicKey = C::PublicKey>,
114        ),
115    ) -> Handle<()> {
116        self.context
117            .clone()
118            .spawn(|_| self.run(voter_network, resolver_network))
119    }
120
121    async fn run(
122        self,
123        voter_network: (
124            impl Sender<PublicKey = C::PublicKey>,
125            impl Receiver<PublicKey = C::PublicKey>,
126        ),
127        resolver_network: (
128            impl Sender<PublicKey = C::PublicKey>,
129            impl Receiver<PublicKey = C::PublicKey>,
130        ),
131    ) {
132        // Start the voter
133        let (voter_sender, voter_receiver) = voter_network;
134        let mut voter_task = self
135            .voter
136            .start(self.resolver_mailbox, voter_sender, voter_receiver);
137
138        // Start the resolver
139        let (resolver_sender, resolver_receiver) = resolver_network;
140        let mut resolver_task =
141            self.resolver
142                .start(self.voter_mailbox, resolver_sender, resolver_receiver);
143
144        // Wait for the resolver or voter to finish
145        select! {
146            _ = &mut voter_task => {
147                debug!("voter finished");
148                resolver_task.abort();
149            },
150            _ = &mut resolver_task => {
151                debug!("resolver finished");
152                voter_task.abort();
153            },
154        }
155    }
156}