1use super::{
2 actors::{batcher, resolver, voter},
3 config::Config,
4 elector::Config as Elector,
5 types::{Activity, Context},
6};
7use crate::{
8 simplex::{scheme::Scheme, Plan},
9 CertifiableAutomaton, Relay, Reporter,
10};
11use commonware_cryptography::Digest;
12use commonware_macros::select;
13use commonware_p2p::{Blocker, Receiver, Sender};
14use commonware_parallel::Strategy;
15use commonware_runtime::{
16 spawn_cell, BufferPooler, Clock, ContextCell, Handle, Metrics, Spawner, Storage,
17};
18use rand_core::CryptoRngCore;
19use tracing::debug;
20
21pub struct Engine<
23 E: BufferPooler + Clock + CryptoRngCore + Spawner + Storage + Metrics,
24 S: Scheme<D>,
25 L: Elector<S>,
26 B: Blocker<PublicKey = S::PublicKey>,
27 D: Digest,
28 A: CertifiableAutomaton<Context = Context<D, S::PublicKey>, Digest = D>,
29 R: Relay<Digest = D, PublicKey = S::PublicKey, Plan = Plan<S::PublicKey>>,
30 F: Reporter<Activity = Activity<S, D>>,
31 T: Strategy,
32> {
33 context: ContextCell<E>,
34
35 voter: voter::Actor<E, S, L, B, D, A, R, F>,
36 voter_mailbox: voter::Mailbox<S, D>,
37
38 batcher: batcher::Actor<E, S, B, D, F, R, T>,
39 batcher_mailbox: batcher::Mailbox<S, D>,
40
41 resolver: resolver::Actor<E, S, B, D, T>,
42 resolver_mailbox: resolver::Mailbox<S, D>,
43}
44
45impl<
46 E: BufferPooler + Clock + CryptoRngCore + Spawner + Storage + Metrics,
47 S: Scheme<D>,
48 L: Elector<S>,
49 B: Blocker<PublicKey = S::PublicKey>,
50 D: Digest,
51 A: CertifiableAutomaton<Context = Context<D, S::PublicKey>, Digest = D>,
52 R: Relay<Digest = D, PublicKey = S::PublicKey, Plan = Plan<S::PublicKey>>,
53 F: Reporter<Activity = Activity<S, D>>,
54 T: Strategy,
55 > Engine<E, S, L, B, D, A, R, F, T>
56{
57 pub fn new(mut context: E, cfg: Config<S, L, B, D, A, R, F, T>) -> Self {
59 cfg.assert(&mut context);
61
62 let (batcher, batcher_mailbox) = batcher::Actor::new(
64 context.child("batcher"),
65 batcher::Config {
66 scheme: cfg.scheme.clone(),
67 blocker: cfg.blocker.clone(),
68 reporter: cfg.reporter.clone(),
69 relay: cfg.relay.clone(),
70 strategy: cfg.strategy.clone(),
71 epoch: cfg.epoch,
72 mailbox_size: cfg.mailbox_size,
73 activity_timeout: cfg.activity_timeout,
74 skip_timeout: cfg.skip_timeout,
75 forwarding: cfg.forwarding,
76 },
77 );
78
79 let (voter, voter_mailbox) = voter::Actor::new(
81 context.child("voter"),
82 voter::Config {
83 scheme: cfg.scheme.clone(),
84 elector: cfg.elector,
85 blocker: cfg.blocker.clone(),
86 automaton: cfg.automaton,
87 relay: cfg.relay,
88 reporter: cfg.reporter,
89 partition: cfg.partition,
90 mailbox_size: cfg.mailbox_size,
91 epoch: cfg.epoch,
92 floor: cfg.floor,
93 leader_timeout: cfg.leader_timeout,
94 certification_timeout: cfg.certification_timeout,
95 timeout_retry: cfg.timeout_retry,
96 activity_timeout: cfg.activity_timeout,
97 replay_buffer: cfg.replay_buffer,
98 write_buffer: cfg.write_buffer,
99 page_cache: cfg.page_cache,
100 },
101 );
102
103 let (resolver, resolver_mailbox) = resolver::Actor::new(
105 context.child("resolver"),
106 resolver::Config {
107 blocker: cfg.blocker,
108 scheme: cfg.scheme,
109 strategy: cfg.strategy,
110 mailbox_size: cfg.mailbox_size,
111 epoch: cfg.epoch,
112 fetch_concurrent: cfg.fetch_concurrent,
113 fetch_timeout: cfg.fetch_timeout,
114 },
115 );
116
117 Self {
119 context: ContextCell::new(context),
120
121 voter,
122 voter_mailbox,
123
124 batcher,
125 batcher_mailbox,
126
127 resolver,
128 resolver_mailbox,
129 }
130 }
131
132 pub fn start(
170 mut self,
171 vote_network: (
172 impl Sender<PublicKey = S::PublicKey>,
173 impl Receiver<PublicKey = S::PublicKey>,
174 ),
175 certificate_network: (
176 impl Sender<PublicKey = S::PublicKey>,
177 impl Receiver<PublicKey = S::PublicKey>,
178 ),
179 resolver_network: (
180 impl Sender<PublicKey = S::PublicKey>,
181 impl Receiver<PublicKey = S::PublicKey>,
182 ),
183 ) -> Handle<()> {
184 spawn_cell!(
185 self.context,
186 self.run(vote_network, certificate_network, resolver_network)
187 )
188 }
189
190 async fn run(
191 self,
192 vote_network: (
193 impl Sender<PublicKey = S::PublicKey>,
194 impl Receiver<PublicKey = S::PublicKey>,
195 ),
196 certificate_network: (
197 impl Sender<PublicKey = S::PublicKey>,
198 impl Receiver<PublicKey = S::PublicKey>,
199 ),
200 resolver_network: (
201 impl Sender<PublicKey = S::PublicKey>,
202 impl Receiver<PublicKey = S::PublicKey>,
203 ),
204 ) {
205 let (vote_sender, vote_receiver) = vote_network;
208 let (certificate_sender, certificate_receiver) = certificate_network;
209 let mut batcher_task = self.batcher.start(
210 self.voter_mailbox.clone(),
211 vote_receiver,
212 certificate_receiver,
213 );
214
215 let (resolver_sender, resolver_receiver) = resolver_network;
217 let mut resolver_task =
218 self.resolver
219 .start(self.voter_mailbox, resolver_sender, resolver_receiver);
220
221 let mut voter_task = self.voter.start(
223 self.batcher_mailbox,
224 self.resolver_mailbox,
225 vote_sender,
226 certificate_sender,
227 );
228
229 let mut shutdown = self.context.stopped();
231 select! {
232 _ = &mut shutdown => {
233 debug!("context shutdown, stopping engine");
234 },
235 voter = &mut voter_task => {
236 debug!(?voter, "voter stopped, shutting down engine");
237 },
238 batcher = &mut batcher_task => {
239 debug!(?batcher, "batcher stopped, shutting down engine");
240 },
241 resolver = &mut resolver_task => {
242 debug!(?resolver, "resolver stopped, shutting down engine");
243 },
244 }
245 }
246}