1use super::{
2 actors::{batcher, resolver, voter},
3 config::Config,
4 elector::Config as Elector,
5 types::{Activity, Context},
6};
7use crate::{
8 simplex::{scheme::Scheme, Plan},
9 CertifiableAutomaton, Relay, Reporter,
10};
11use commonware_cryptography::Digest;
12use commonware_macros::select;
13use commonware_p2p::{Blocker, Receiver, Sender};
14use commonware_parallel::Strategy;
15use commonware_runtime::{
16 spawn_cell, BufferPooler, Clock, ContextCell, Handle, Metrics, Spawner, Storage,
17};
18use rand_core::CryptoRngCore;
19use tracing::debug;
20
21pub struct Engine<
23 E: BufferPooler + Clock + CryptoRngCore + Spawner + Storage + Metrics,
24 S: Scheme<D>,
25 L: Elector<S>,
26 B: Blocker<PublicKey = S::PublicKey>,
27 D: Digest,
28 A: CertifiableAutomaton<Context = Context<D, S::PublicKey>, Digest = D>,
29 R: Relay<Digest = D, PublicKey = S::PublicKey, Plan = Plan<S::PublicKey>>,
30 F: Reporter<Activity = Activity<S, D>>,
31 T: Strategy,
32> {
33 context: ContextCell<E>,
34
35 voter: voter::Actor<E, S, L, B, D, A, R, F>,
36 voter_mailbox: voter::Mailbox<S, D>,
37
38 batcher: batcher::Actor<E, S, B, D, F, R, T>,
39 batcher_mailbox: batcher::Mailbox<S, D>,
40
41 resolver: resolver::Actor<E, S, B, D, T>,
42 resolver_mailbox: resolver::Mailbox<S, D>,
43}
44
45impl<
46 E: BufferPooler + Clock + CryptoRngCore + Spawner + Storage + Metrics,
47 S: Scheme<D>,
48 L: Elector<S>,
49 B: Blocker<PublicKey = S::PublicKey>,
50 D: Digest,
51 A: CertifiableAutomaton<Context = Context<D, S::PublicKey>, Digest = D>,
52 R: Relay<Digest = D, PublicKey = S::PublicKey, Plan = Plan<S::PublicKey>>,
53 F: Reporter<Activity = Activity<S, D>>,
54 T: Strategy,
55 > Engine<E, S, L, B, D, A, R, F, T>
56{
57 pub fn new(context: E, cfg: Config<S, L, B, D, A, R, F, T>) -> Self {
59 cfg.assert();
61
62 let (batcher, batcher_mailbox) = batcher::Actor::new(
64 context.with_label("batcher"),
65 batcher::Config {
66 scheme: cfg.scheme.clone(),
67 blocker: cfg.blocker.clone(),
68 reporter: cfg.reporter.clone(),
69 relay: cfg.relay.clone(),
70 strategy: cfg.strategy.clone(),
71 epoch: cfg.epoch,
72 mailbox_size: cfg.mailbox_size,
73 activity_timeout: cfg.activity_timeout,
74 skip_timeout: cfg.skip_timeout,
75 forwarding: cfg.forwarding,
76 },
77 );
78
79 let (voter, voter_mailbox) = voter::Actor::new(
81 context.with_label("voter"),
82 voter::Config {
83 scheme: cfg.scheme.clone(),
84 elector: cfg.elector,
85 blocker: cfg.blocker.clone(),
86 automaton: cfg.automaton,
87 relay: cfg.relay,
88 reporter: cfg.reporter,
89 partition: cfg.partition,
90 mailbox_size: cfg.mailbox_size,
91 epoch: cfg.epoch,
92 leader_timeout: cfg.leader_timeout,
93 certification_timeout: cfg.certification_timeout,
94 timeout_retry: cfg.timeout_retry,
95 activity_timeout: cfg.activity_timeout,
96 replay_buffer: cfg.replay_buffer,
97 write_buffer: cfg.write_buffer,
98 page_cache: cfg.page_cache,
99 },
100 );
101
102 let (resolver, resolver_mailbox) = resolver::Actor::new(
104 context.with_label("resolver"),
105 resolver::Config {
106 blocker: cfg.blocker,
107 scheme: cfg.scheme,
108 strategy: cfg.strategy,
109 mailbox_size: cfg.mailbox_size,
110 epoch: cfg.epoch,
111 fetch_concurrent: cfg.fetch_concurrent,
112 fetch_timeout: cfg.fetch_timeout,
113 },
114 );
115
116 Self {
118 context: ContextCell::new(context),
119
120 voter,
121 voter_mailbox,
122
123 batcher,
124 batcher_mailbox,
125
126 resolver,
127 resolver_mailbox,
128 }
129 }
130
131 pub fn start(
169 mut self,
170 vote_network: (
171 impl Sender<PublicKey = S::PublicKey>,
172 impl Receiver<PublicKey = S::PublicKey>,
173 ),
174 certificate_network: (
175 impl Sender<PublicKey = S::PublicKey>,
176 impl Receiver<PublicKey = S::PublicKey>,
177 ),
178 resolver_network: (
179 impl Sender<PublicKey = S::PublicKey>,
180 impl Receiver<PublicKey = S::PublicKey>,
181 ),
182 ) -> Handle<()> {
183 spawn_cell!(
184 self.context,
185 self.run(vote_network, certificate_network, resolver_network)
186 .await
187 )
188 }
189
190 async fn run(
191 self,
192 vote_network: (
193 impl Sender<PublicKey = S::PublicKey>,
194 impl Receiver<PublicKey = S::PublicKey>,
195 ),
196 certificate_network: (
197 impl Sender<PublicKey = S::PublicKey>,
198 impl Receiver<PublicKey = S::PublicKey>,
199 ),
200 resolver_network: (
201 impl Sender<PublicKey = S::PublicKey>,
202 impl Receiver<PublicKey = S::PublicKey>,
203 ),
204 ) {
205 let (vote_sender, vote_receiver) = vote_network;
208 let (certificate_sender, certificate_receiver) = certificate_network;
209 let mut batcher_task = self.batcher.start(
210 self.voter_mailbox.clone(),
211 vote_receiver,
212 certificate_receiver,
213 );
214
215 let (resolver_sender, resolver_receiver) = resolver_network;
217 let mut resolver_task =
218 self.resolver
219 .start(self.voter_mailbox, resolver_sender, resolver_receiver);
220
221 let mut voter_task = self.voter.start(
223 self.batcher_mailbox,
224 self.resolver_mailbox,
225 vote_sender,
226 certificate_sender,
227 );
228
229 let mut shutdown = self.context.stopped();
231 select! {
232 _ = &mut shutdown => {
233 debug!("context shutdown, stopping engine");
234 },
235 _ = &mut voter_task => {
236 panic!("voter should not finish");
237 },
238 _ = &mut batcher_task => {
239 panic!("batcher should not finish");
240 },
241 _ = &mut resolver_task => {
242 panic!("resolver should not finish");
243 },
244 }
245 }
246}