1use super::{
2 actors::{batcher, resolver, voter},
3 config::Config,
4 types::{Activity, Context},
5};
6use crate::{simplex::signing_scheme::Scheme, Automaton, Relay, Reporter};
7use commonware_cryptography::{Digest, PublicKey};
8use commonware_macros::select;
9use commonware_p2p::{Blocker, Receiver, Sender};
10use commonware_runtime::{spawn_cell, Clock, ContextCell, Handle, Metrics, Spawner, Storage};
11use governor::clock::Clock as GClock;
12use rand::{CryptoRng, Rng};
13use tracing::debug;
14
15pub struct Engine<
17 E: Clock + GClock + Rng + CryptoRng + Spawner + Storage + Metrics,
18 P: PublicKey,
19 S: Scheme<PublicKey = P>,
20 B: Blocker<PublicKey = P>,
21 D: Digest,
22 A: Automaton<Context = Context<D, P>, Digest = D>,
23 R: Relay<Digest = D>,
24 F: Reporter<Activity = Activity<S, D>>,
25> {
26 context: ContextCell<E>,
27
28 voter: voter::Actor<E, P, S, B, D, A, R, F>,
29 voter_mailbox: voter::Mailbox<S, D>,
30
31 batcher: batcher::Actor<E, P, S, B, D, F>,
32 batcher_mailbox: batcher::Mailbox<S, D>,
33
34 resolver: resolver::Actor<E, P, S, B, D>,
35 resolver_mailbox: resolver::Mailbox<S, D>,
36}
37
38impl<
39 E: Clock + GClock + Rng + CryptoRng + Spawner + Storage + Metrics,
40 P: PublicKey,
41 S: Scheme<PublicKey = P>,
42 B: Blocker<PublicKey = P>,
43 D: Digest,
44 A: Automaton<Context = Context<D, P>, Digest = D>,
45 R: Relay<Digest = D>,
46 F: Reporter<Activity = Activity<S, D>>,
47 > Engine<E, P, S, B, D, A, R, F>
48{
49 pub fn new(context: E, cfg: Config<P, S, B, D, A, R, F>) -> Self {
51 cfg.assert();
53
54 let (batcher, batcher_mailbox) = batcher::Actor::new(
56 context.with_label("batcher"),
57 batcher::Config {
58 scheme: cfg.scheme.clone(),
59 blocker: cfg.blocker.clone(),
60 reporter: cfg.reporter.clone(),
61 epoch: cfg.epoch,
62 namespace: cfg.namespace.clone(),
63 mailbox_size: cfg.mailbox_size,
64 activity_timeout: cfg.activity_timeout,
65 skip_timeout: cfg.skip_timeout,
66 },
67 );
68
69 let (voter, voter_mailbox) = voter::Actor::new(
71 context.with_label("voter"),
72 voter::Config {
73 scheme: cfg.scheme.clone(),
74 blocker: cfg.blocker.clone(),
75 automaton: cfg.automaton,
76 relay: cfg.relay,
77 reporter: cfg.reporter,
78 partition: cfg.partition,
79 mailbox_size: cfg.mailbox_size,
80 epoch: cfg.epoch,
81 namespace: cfg.namespace.clone(),
82 leader_timeout: cfg.leader_timeout,
83 notarization_timeout: cfg.notarization_timeout,
84 nullify_retry: cfg.nullify_retry,
85 activity_timeout: cfg.activity_timeout,
86 replay_buffer: cfg.replay_buffer,
87 write_buffer: cfg.write_buffer,
88 buffer_pool: cfg.buffer_pool,
89 },
90 );
91
92 let (resolver, resolver_mailbox) = resolver::Actor::new(
94 context.with_label("resolver"),
95 resolver::Config {
96 blocker: cfg.blocker,
97 scheme: cfg.scheme,
98 mailbox_size: cfg.mailbox_size,
99 epoch: cfg.epoch,
100 namespace: cfg.namespace,
101 activity_timeout: cfg.activity_timeout,
102 fetch_timeout: cfg.fetch_timeout,
103 fetch_concurrent: cfg.fetch_concurrent,
104 max_fetch_count: cfg.max_fetch_count,
105 fetch_rate_per_peer: cfg.fetch_rate_per_peer,
106 },
107 );
108
109 Self {
111 context: ContextCell::new(context),
112
113 voter,
114 voter_mailbox,
115
116 batcher,
117 batcher_mailbox,
118
119 resolver,
120 resolver_mailbox,
121 }
122 }
123
124 pub fn start(
128 mut self,
129 pending_network: (impl Sender<PublicKey = P>, impl Receiver<PublicKey = P>),
130 recovered_network: (impl Sender<PublicKey = P>, impl Receiver<PublicKey = P>),
131 resolver_network: (impl Sender<PublicKey = P>, impl Receiver<PublicKey = P>),
132 ) -> Handle<()> {
133 spawn_cell!(
134 self.context,
135 self.run(pending_network, recovered_network, resolver_network)
136 .await
137 )
138 }
139
140 async fn run(
141 self,
142 pending_network: (impl Sender<PublicKey = P>, impl Receiver<PublicKey = P>),
143 recovered_network: (impl Sender<PublicKey = P>, impl Receiver<PublicKey = P>),
144 resolver_network: (impl Sender<PublicKey = P>, impl Receiver<PublicKey = P>),
145 ) {
146 let (pending_sender, pending_receiver) = pending_network;
148 let mut batcher_task = self
149 .batcher
150 .start(self.voter_mailbox.clone(), pending_receiver);
151
152 let (resolver_sender, resolver_receiver) = resolver_network;
154 let mut resolver_task =
155 self.resolver
156 .start(self.voter_mailbox, resolver_sender, resolver_receiver);
157
158 let (recovered_sender, recovered_receiver) = recovered_network;
160 let mut voter_task = self.voter.start(
161 self.batcher_mailbox,
162 self.resolver_mailbox,
163 pending_sender,
164 recovered_sender,
165 recovered_receiver,
166 );
167
168 let mut shutdown = self.context.stopped();
170 select! {
171 _ = &mut shutdown => {
172 debug!("shutdown");
173 },
174 _ = &mut voter_task => {
175 panic!("voter should not finish");
176 },
177 _ = &mut batcher_task => {
178 panic!("batcher should not finish");
179 },
180 _ = &mut resolver_task => {
181 panic!("resolver should not finish");
182 },
183 }
184 }
185}