commonware_consensus/simplex/
engine.rs1use super::{
2 actors::{resolver, voter},
3 config::Config,
4 types::{Activity, Context, View},
5};
6use crate::{Automaton, Relay, Reporter, Supervisor};
7use commonware_cryptography::{Digest, Signer};
8use commonware_macros::select;
9use commonware_p2p::{Receiver, Sender};
10use commonware_runtime::{Clock, Handle, Metrics, Spawner, Storage};
11use governor::clock::Clock as GClock;
12use rand::{CryptoRng, Rng};
13use tracing::debug;
14
15pub struct Engine<
17 E: Clock + GClock + Rng + CryptoRng + Spawner + Storage + Metrics,
18 C: Signer,
19 D: Digest,
20 A: Automaton<Context = Context<D>, Digest = D>,
21 R: Relay<Digest = D>,
22 F: Reporter<Activity = Activity<C::Signature, D>>,
23 S: Supervisor<Index = View, PublicKey = C::PublicKey>,
24> {
25 context: E,
26
27 voter: voter::Actor<E, C, D, A, R, F, S>,
28 voter_mailbox: voter::Mailbox<C::Signature, D>,
29 resolver: resolver::Actor<E, C::PublicKey, D, S>,
30 resolver_mailbox: resolver::Mailbox<C::Signature, D>,
31}
32
33impl<
34 E: Clock + GClock + Rng + CryptoRng + Spawner + Storage + Metrics,
35 C: Signer,
36 D: Digest,
37 A: Automaton<Context = Context<D>, Digest = D>,
38 R: Relay<Digest = D>,
39 F: Reporter<Activity = Activity<C::Signature, D>>,
40 S: Supervisor<Index = View, PublicKey = C::PublicKey>,
41 > Engine<E, C, D, A, R, F, S>
42{
43 pub fn new(context: E, cfg: Config<C, D, A, R, F, S>) -> Self {
45 cfg.assert();
47
48 let public_key = cfg.crypto.public_key();
50 let (voter, voter_mailbox) = voter::Actor::new(
51 context.with_label("voter"),
52 voter::Config {
53 crypto: cfg.crypto,
54 automaton: cfg.automaton,
55 relay: cfg.relay,
56 reporter: cfg.reporter,
57 supervisor: cfg.supervisor.clone(),
58 partition: cfg.partition,
59 mailbox_size: cfg.mailbox_size,
60 namespace: cfg.namespace.clone(),
61 max_participants: cfg.max_participants,
62 leader_timeout: cfg.leader_timeout,
63 notarization_timeout: cfg.notarization_timeout,
64 nullify_retry: cfg.nullify_retry,
65 activity_timeout: cfg.activity_timeout,
66 skip_timeout: cfg.skip_timeout,
67 replay_buffer: cfg.replay_buffer,
68 write_buffer: cfg.write_buffer,
69 buffer_pool: cfg.buffer_pool,
70 },
71 );
72
73 let (resolver, resolver_mailbox) = resolver::Actor::new(
75 context.with_label("resolver"),
76 resolver::Config {
77 crypto: public_key,
78 supervisor: cfg.supervisor,
79 mailbox_size: cfg.mailbox_size,
80 namespace: cfg.namespace,
81 max_participants: cfg.max_participants,
82 activity_timeout: cfg.activity_timeout,
83 fetch_timeout: cfg.fetch_timeout,
84 fetch_concurrent: cfg.fetch_concurrent,
85 max_fetch_count: cfg.max_fetch_count,
86 fetch_rate_per_peer: cfg.fetch_rate_per_peer,
87 },
88 );
89
90 Self {
92 context,
93
94 voter,
95 voter_mailbox,
96 resolver,
97 resolver_mailbox,
98 }
99 }
100
101 pub fn start(
105 self,
106 voter_network: (
107 impl Sender<PublicKey = C::PublicKey>,
108 impl Receiver<PublicKey = C::PublicKey>,
109 ),
110 resolver_network: (
111 impl Sender<PublicKey = C::PublicKey>,
112 impl Receiver<PublicKey = C::PublicKey>,
113 ),
114 ) -> Handle<()> {
115 self.context
116 .clone()
117 .spawn(|_| self.run(voter_network, resolver_network))
118 }
119
120 async fn run(
121 self,
122 voter_network: (
123 impl Sender<PublicKey = C::PublicKey>,
124 impl Receiver<PublicKey = C::PublicKey>,
125 ),
126 resolver_network: (
127 impl Sender<PublicKey = C::PublicKey>,
128 impl Receiver<PublicKey = C::PublicKey>,
129 ),
130 ) {
131 let (voter_sender, voter_receiver) = voter_network;
133 let mut voter_task = self
134 .voter
135 .start(self.resolver_mailbox, voter_sender, voter_receiver);
136
137 let (resolver_sender, resolver_receiver) = resolver_network;
139 let mut resolver_task =
140 self.resolver
141 .start(self.voter_mailbox, resolver_sender, resolver_receiver);
142
143 select! {
145 _ = &mut voter_task => {
146 debug!("voter finished");
147 resolver_task.abort();
148 },
149 _ = &mut resolver_task => {
150 debug!("resolver finished");
151 voter_task.abort();
152 },
153 }
154 }
155}