1use super::{
2 actors::{batcher, resolver, voter},
3 config::Config,
4 elector::Config as Elector,
5 types::{Activity, Context},
6};
7use crate::{simplex::scheme::Scheme, CertifiableAutomaton, Relay, Reporter};
8use commonware_cryptography::Digest;
9use commonware_macros::select;
10use commonware_p2p::{Blocker, Receiver, Sender};
11use commonware_runtime::{spawn_cell, Clock, ContextCell, Handle, Metrics, Spawner, Storage};
12use rand::{CryptoRng, Rng};
13use tracing::debug;
14
15pub struct Engine<
17 E: Clock + Rng + CryptoRng + Spawner + Storage + Metrics,
18 S: Scheme<D>,
19 L: Elector<S>,
20 B: Blocker<PublicKey = S::PublicKey>,
21 D: Digest,
22 A: CertifiableAutomaton<Context = Context<D, S::PublicKey>, Digest = D>,
23 R: Relay<Digest = D>,
24 F: Reporter<Activity = Activity<S, D>>,
25> {
26 context: ContextCell<E>,
27
28 voter: voter::Actor<E, S, L, B, D, A, R, F>,
29 voter_mailbox: voter::Mailbox<S, D>,
30
31 batcher: batcher::Actor<E, S, B, D, F>,
32 batcher_mailbox: batcher::Mailbox<S, D>,
33
34 resolver: resolver::Actor<E, S, B, D>,
35 resolver_mailbox: resolver::Mailbox<S, D>,
36}
37
38impl<
39 E: Clock + Rng + CryptoRng + Spawner + Storage + Metrics,
40 S: Scheme<D>,
41 L: Elector<S>,
42 B: Blocker<PublicKey = S::PublicKey>,
43 D: Digest,
44 A: CertifiableAutomaton<Context = Context<D, S::PublicKey>, Digest = D>,
45 R: Relay<Digest = D>,
46 F: Reporter<Activity = Activity<S, D>>,
47 > Engine<E, S, L, B, D, A, R, F>
48{
49 pub fn new(context: E, cfg: Config<S, L, B, D, A, R, F>) -> Self {
51 cfg.assert();
53
54 let (batcher, batcher_mailbox) = batcher::Actor::new(
56 context.with_label("batcher"),
57 batcher::Config {
58 scheme: cfg.scheme.clone(),
59 blocker: cfg.blocker.clone(),
60 reporter: cfg.reporter.clone(),
61 epoch: cfg.epoch,
62 namespace: cfg.namespace.clone(),
63 mailbox_size: cfg.mailbox_size,
64 activity_timeout: cfg.activity_timeout,
65 skip_timeout: cfg.skip_timeout,
66 },
67 );
68
69 let (voter, voter_mailbox) = voter::Actor::new(
71 context.with_label("voter"),
72 voter::Config {
73 scheme: cfg.scheme.clone(),
74 elector: cfg.elector,
75 blocker: cfg.blocker.clone(),
76 automaton: cfg.automaton,
77 relay: cfg.relay,
78 reporter: cfg.reporter,
79 partition: cfg.partition,
80 mailbox_size: cfg.mailbox_size,
81 epoch: cfg.epoch,
82 namespace: cfg.namespace.clone(),
83 leader_timeout: cfg.leader_timeout,
84 notarization_timeout: cfg.notarization_timeout,
85 nullify_retry: cfg.nullify_retry,
86 activity_timeout: cfg.activity_timeout,
87 replay_buffer: cfg.replay_buffer,
88 write_buffer: cfg.write_buffer,
89 buffer_pool: cfg.buffer_pool,
90 },
91 );
92
93 let (resolver, resolver_mailbox) = resolver::Actor::new(
95 context.with_label("resolver"),
96 resolver::Config {
97 blocker: cfg.blocker,
98 scheme: cfg.scheme,
99 mailbox_size: cfg.mailbox_size,
100 epoch: cfg.epoch,
101 namespace: cfg.namespace,
102 fetch_concurrent: cfg.fetch_concurrent,
103 fetch_timeout: cfg.fetch_timeout,
104 },
105 );
106
107 Self {
109 context: ContextCell::new(context),
110
111 voter,
112 voter_mailbox,
113
114 batcher,
115 batcher_mailbox,
116
117 resolver,
118 resolver_mailbox,
119 }
120 }
121
122 pub fn start(
160 mut self,
161 vote_network: (
162 impl Sender<PublicKey = S::PublicKey>,
163 impl Receiver<PublicKey = S::PublicKey>,
164 ),
165 certificate_network: (
166 impl Sender<PublicKey = S::PublicKey>,
167 impl Receiver<PublicKey = S::PublicKey>,
168 ),
169 resolver_network: (
170 impl Sender<PublicKey = S::PublicKey>,
171 impl Receiver<PublicKey = S::PublicKey>,
172 ),
173 ) -> Handle<()> {
174 spawn_cell!(
175 self.context,
176 self.run(vote_network, certificate_network, resolver_network)
177 .await
178 )
179 }
180
181 async fn run(
182 self,
183 vote_network: (
184 impl Sender<PublicKey = S::PublicKey>,
185 impl Receiver<PublicKey = S::PublicKey>,
186 ),
187 certificate_network: (
188 impl Sender<PublicKey = S::PublicKey>,
189 impl Receiver<PublicKey = S::PublicKey>,
190 ),
191 resolver_network: (
192 impl Sender<PublicKey = S::PublicKey>,
193 impl Receiver<PublicKey = S::PublicKey>,
194 ),
195 ) {
196 let (vote_sender, vote_receiver) = vote_network;
199 let (certificate_sender, certificate_receiver) = certificate_network;
200 let mut batcher_task = self.batcher.start(
201 self.voter_mailbox.clone(),
202 vote_receiver,
203 certificate_receiver,
204 );
205
206 let (resolver_sender, resolver_receiver) = resolver_network;
208 let mut resolver_task =
209 self.resolver
210 .start(self.voter_mailbox, resolver_sender, resolver_receiver);
211
212 let mut voter_task = self.voter.start(
214 self.batcher_mailbox,
215 self.resolver_mailbox,
216 vote_sender,
217 certificate_sender,
218 );
219
220 let mut shutdown = self.context.stopped();
222 select! {
223 _ = &mut shutdown => {
224 debug!("context shutdown, stopping engine");
225 },
226 _ = &mut voter_task => {
227 panic!("voter should not finish");
228 },
229 _ = &mut batcher_task => {
230 panic!("batcher should not finish");
231 },
232 _ = &mut resolver_task => {
233 panic!("resolver should not finish");
234 },
235 }
236 }
237}