1use super::{
2 actors::{batcher, resolver, voter},
3 config::Config,
4 elector::Config as Elector,
5 types::{Activity, Context},
6};
7use crate::{simplex::scheme::Scheme, CertifiableAutomaton, Relay, Reporter};
8use commonware_cryptography::Digest;
9use commonware_macros::select;
10use commonware_p2p::{Blocker, Receiver, Sender};
11use commonware_parallel::Strategy;
12use commonware_runtime::{
13 spawn_cell, BufferPooler, Clock, ContextCell, Handle, Metrics, Spawner, Storage,
14};
15use rand_core::CryptoRngCore;
16use tracing::debug;
17
18pub struct Engine<
20 E: BufferPooler + Clock + CryptoRngCore + Spawner + Storage + Metrics,
21 S: Scheme<D>,
22 L: Elector<S>,
23 B: Blocker<PublicKey = S::PublicKey>,
24 D: Digest,
25 A: CertifiableAutomaton<Context = Context<D, S::PublicKey>, Digest = D>,
26 R: Relay<Digest = D>,
27 F: Reporter<Activity = Activity<S, D>>,
28 T: Strategy,
29> {
30 context: ContextCell<E>,
31
32 voter: voter::Actor<E, S, L, B, D, A, R, F>,
33 voter_mailbox: voter::Mailbox<S, D>,
34
35 batcher: batcher::Actor<E, S, B, D, F, T>,
36 batcher_mailbox: batcher::Mailbox<S, D>,
37
38 resolver: resolver::Actor<E, S, B, D, T>,
39 resolver_mailbox: resolver::Mailbox<S, D>,
40}
41
42impl<
43 E: BufferPooler + Clock + CryptoRngCore + Spawner + Storage + Metrics,
44 S: Scheme<D>,
45 L: Elector<S>,
46 B: Blocker<PublicKey = S::PublicKey>,
47 D: Digest,
48 A: CertifiableAutomaton<Context = Context<D, S::PublicKey>, Digest = D>,
49 R: Relay<Digest = D>,
50 F: Reporter<Activity = Activity<S, D>>,
51 T: Strategy,
52 > Engine<E, S, L, B, D, A, R, F, T>
53{
54 pub fn new(context: E, cfg: Config<S, L, B, D, A, R, F, T>) -> Self {
56 cfg.assert();
58
59 let (batcher, batcher_mailbox) = batcher::Actor::new(
61 context.with_label("batcher"),
62 batcher::Config {
63 scheme: cfg.scheme.clone(),
64 blocker: cfg.blocker.clone(),
65 reporter: cfg.reporter.clone(),
66 strategy: cfg.strategy.clone(),
67 epoch: cfg.epoch,
68 mailbox_size: cfg.mailbox_size,
69 activity_timeout: cfg.activity_timeout,
70 skip_timeout: cfg.skip_timeout,
71 },
72 );
73
74 let (voter, voter_mailbox) = voter::Actor::new(
76 context.with_label("voter"),
77 voter::Config {
78 scheme: cfg.scheme.clone(),
79 elector: cfg.elector,
80 blocker: cfg.blocker.clone(),
81 automaton: cfg.automaton,
82 relay: cfg.relay,
83 reporter: cfg.reporter,
84 partition: cfg.partition,
85 mailbox_size: cfg.mailbox_size,
86 epoch: cfg.epoch,
87 leader_timeout: cfg.leader_timeout,
88 certification_timeout: cfg.certification_timeout,
89 timeout_retry: cfg.timeout_retry,
90 activity_timeout: cfg.activity_timeout,
91 replay_buffer: cfg.replay_buffer,
92 write_buffer: cfg.write_buffer,
93 page_cache: cfg.page_cache,
94 },
95 );
96
97 let (resolver, resolver_mailbox) = resolver::Actor::new(
99 context.with_label("resolver"),
100 resolver::Config {
101 blocker: cfg.blocker,
102 scheme: cfg.scheme,
103 strategy: cfg.strategy,
104 mailbox_size: cfg.mailbox_size,
105 epoch: cfg.epoch,
106 fetch_concurrent: cfg.fetch_concurrent,
107 fetch_timeout: cfg.fetch_timeout,
108 },
109 );
110
111 Self {
113 context: ContextCell::new(context),
114
115 voter,
116 voter_mailbox,
117
118 batcher,
119 batcher_mailbox,
120
121 resolver,
122 resolver_mailbox,
123 }
124 }
125
126 pub fn start(
164 mut self,
165 vote_network: (
166 impl Sender<PublicKey = S::PublicKey>,
167 impl Receiver<PublicKey = S::PublicKey>,
168 ),
169 certificate_network: (
170 impl Sender<PublicKey = S::PublicKey>,
171 impl Receiver<PublicKey = S::PublicKey>,
172 ),
173 resolver_network: (
174 impl Sender<PublicKey = S::PublicKey>,
175 impl Receiver<PublicKey = S::PublicKey>,
176 ),
177 ) -> Handle<()> {
178 spawn_cell!(
179 self.context,
180 self.run(vote_network, certificate_network, resolver_network)
181 .await
182 )
183 }
184
185 async fn run(
186 self,
187 vote_network: (
188 impl Sender<PublicKey = S::PublicKey>,
189 impl Receiver<PublicKey = S::PublicKey>,
190 ),
191 certificate_network: (
192 impl Sender<PublicKey = S::PublicKey>,
193 impl Receiver<PublicKey = S::PublicKey>,
194 ),
195 resolver_network: (
196 impl Sender<PublicKey = S::PublicKey>,
197 impl Receiver<PublicKey = S::PublicKey>,
198 ),
199 ) {
200 let (vote_sender, vote_receiver) = vote_network;
203 let (certificate_sender, certificate_receiver) = certificate_network;
204 let mut batcher_task = self.batcher.start(
205 self.voter_mailbox.clone(),
206 vote_receiver,
207 certificate_receiver,
208 );
209
210 let (resolver_sender, resolver_receiver) = resolver_network;
212 let mut resolver_task =
213 self.resolver
214 .start(self.voter_mailbox, resolver_sender, resolver_receiver);
215
216 let mut voter_task = self.voter.start(
218 self.batcher_mailbox,
219 self.resolver_mailbox,
220 vote_sender,
221 certificate_sender,
222 );
223
224 let mut shutdown = self.context.stopped();
226 select! {
227 _ = &mut shutdown => {
228 debug!("context shutdown, stopping engine");
229 },
230 _ = &mut voter_task => {
231 panic!("voter should not finish");
232 },
233 _ = &mut batcher_task => {
234 panic!("batcher should not finish");
235 },
236 _ = &mut resolver_task => {
237 panic!("resolver should not finish");
238 },
239 }
240 }
241}