1use super::{
2 actors::{batcher, resolver, voter},
3 config::Config,
4 elector::Config as Elector,
5 types::{Activity, Context},
6};
7use crate::{simplex::scheme::Scheme, CertifiableAutomaton, Relay, Reporter};
8use commonware_cryptography::Digest;
9use commonware_macros::select;
10use commonware_p2p::{Blocker, Receiver, Sender};
11use commonware_parallel::Strategy;
12use commonware_runtime::{spawn_cell, Clock, ContextCell, Handle, Metrics, Spawner, Storage};
13use rand_core::CryptoRngCore;
14use tracing::debug;
15
16pub struct Engine<
18 E: Clock + CryptoRngCore + Spawner + Storage + Metrics,
19 S: Scheme<D>,
20 L: Elector<S>,
21 B: Blocker<PublicKey = S::PublicKey>,
22 D: Digest,
23 A: CertifiableAutomaton<Context = Context<D, S::PublicKey>, Digest = D>,
24 R: Relay<Digest = D>,
25 F: Reporter<Activity = Activity<S, D>>,
26 T: Strategy,
27> {
28 context: ContextCell<E>,
29
30 voter: voter::Actor<E, S, L, B, D, A, R, F>,
31 voter_mailbox: voter::Mailbox<S, D>,
32
33 batcher: batcher::Actor<E, S, B, D, F, T>,
34 batcher_mailbox: batcher::Mailbox<S, D>,
35
36 resolver: resolver::Actor<E, S, B, D, T>,
37 resolver_mailbox: resolver::Mailbox<S, D>,
38}
39
40impl<
41 E: Clock + CryptoRngCore + Spawner + Storage + Metrics,
42 S: Scheme<D>,
43 L: Elector<S>,
44 B: Blocker<PublicKey = S::PublicKey>,
45 D: Digest,
46 A: CertifiableAutomaton<Context = Context<D, S::PublicKey>, Digest = D>,
47 R: Relay<Digest = D>,
48 F: Reporter<Activity = Activity<S, D>>,
49 T: Strategy,
50 > Engine<E, S, L, B, D, A, R, F, T>
51{
52 pub fn new(context: E, cfg: Config<S, L, B, D, A, R, F, T>) -> Self {
54 cfg.assert();
56
57 let (batcher, batcher_mailbox) = batcher::Actor::new(
59 context.with_label("batcher"),
60 batcher::Config {
61 scheme: cfg.scheme.clone(),
62 blocker: cfg.blocker.clone(),
63 reporter: cfg.reporter.clone(),
64 strategy: cfg.strategy.clone(),
65 epoch: cfg.epoch,
66 mailbox_size: cfg.mailbox_size,
67 activity_timeout: cfg.activity_timeout,
68 skip_timeout: cfg.skip_timeout,
69 },
70 );
71
72 let (voter, voter_mailbox) = voter::Actor::new(
74 context.with_label("voter"),
75 voter::Config {
76 scheme: cfg.scheme.clone(),
77 elector: cfg.elector,
78 blocker: cfg.blocker.clone(),
79 automaton: cfg.automaton,
80 relay: cfg.relay,
81 reporter: cfg.reporter,
82 partition: cfg.partition,
83 mailbox_size: cfg.mailbox_size,
84 epoch: cfg.epoch,
85 leader_timeout: cfg.leader_timeout,
86 notarization_timeout: cfg.notarization_timeout,
87 nullify_retry: cfg.nullify_retry,
88 activity_timeout: cfg.activity_timeout,
89 replay_buffer: cfg.replay_buffer,
90 write_buffer: cfg.write_buffer,
91 buffer_pool: cfg.buffer_pool,
92 },
93 );
94
95 let (resolver, resolver_mailbox) = resolver::Actor::new(
97 context.with_label("resolver"),
98 resolver::Config {
99 blocker: cfg.blocker,
100 scheme: cfg.scheme,
101 strategy: cfg.strategy,
102 mailbox_size: cfg.mailbox_size,
103 epoch: cfg.epoch,
104 fetch_concurrent: cfg.fetch_concurrent,
105 fetch_timeout: cfg.fetch_timeout,
106 },
107 );
108
109 Self {
111 context: ContextCell::new(context),
112
113 voter,
114 voter_mailbox,
115
116 batcher,
117 batcher_mailbox,
118
119 resolver,
120 resolver_mailbox,
121 }
122 }
123
124 pub fn start(
162 mut self,
163 vote_network: (
164 impl Sender<PublicKey = S::PublicKey>,
165 impl Receiver<PublicKey = S::PublicKey>,
166 ),
167 certificate_network: (
168 impl Sender<PublicKey = S::PublicKey>,
169 impl Receiver<PublicKey = S::PublicKey>,
170 ),
171 resolver_network: (
172 impl Sender<PublicKey = S::PublicKey>,
173 impl Receiver<PublicKey = S::PublicKey>,
174 ),
175 ) -> Handle<()> {
176 spawn_cell!(
177 self.context,
178 self.run(vote_network, certificate_network, resolver_network)
179 .await
180 )
181 }
182
183 async fn run(
184 self,
185 vote_network: (
186 impl Sender<PublicKey = S::PublicKey>,
187 impl Receiver<PublicKey = S::PublicKey>,
188 ),
189 certificate_network: (
190 impl Sender<PublicKey = S::PublicKey>,
191 impl Receiver<PublicKey = S::PublicKey>,
192 ),
193 resolver_network: (
194 impl Sender<PublicKey = S::PublicKey>,
195 impl Receiver<PublicKey = S::PublicKey>,
196 ),
197 ) {
198 let (vote_sender, vote_receiver) = vote_network;
201 let (certificate_sender, certificate_receiver) = certificate_network;
202 let mut batcher_task = self.batcher.start(
203 self.voter_mailbox.clone(),
204 vote_receiver,
205 certificate_receiver,
206 );
207
208 let (resolver_sender, resolver_receiver) = resolver_network;
210 let mut resolver_task =
211 self.resolver
212 .start(self.voter_mailbox, resolver_sender, resolver_receiver);
213
214 let mut voter_task = self.voter.start(
216 self.batcher_mailbox,
217 self.resolver_mailbox,
218 vote_sender,
219 certificate_sender,
220 );
221
222 let mut shutdown = self.context.stopped();
224 select! {
225 _ = &mut shutdown => {
226 debug!("context shutdown, stopping engine");
227 },
228 _ = &mut voter_task => {
229 panic!("voter should not finish");
230 },
231 _ = &mut batcher_task => {
232 panic!("batcher should not finish");
233 },
234 _ = &mut resolver_task => {
235 panic!("resolver should not finish");
236 },
237 }
238 }
239}