1use super::{
2 actors::{batcher, resolver, voter},
3 config::Config,
4 types::{Activity, Context, View},
5};
6use crate::{Automaton, Relay, Reporter, ThresholdSupervisor};
7use commonware_cryptography::{
8 bls12381::primitives::{group, variant::Variant},
9 Digest, Signer,
10};
11use commonware_macros::select;
12use commonware_p2p::{Blocker, Receiver, Sender};
13use commonware_runtime::{Clock, Handle, Metrics, Spawner, Storage};
14use governor::clock::Clock as GClock;
15use rand::{CryptoRng, Rng};
16use tracing::debug;
17
18pub struct Engine<
20 E: Clock + GClock + Rng + CryptoRng + Spawner + Storage + Metrics,
21 C: Signer,
22 B: Blocker<PublicKey = C::PublicKey>,
23 V: Variant,
24 D: Digest,
25 A: Automaton<Context = Context<D>, Digest = D>,
26 R: Relay<Digest = D>,
27 F: Reporter<Activity = Activity<V, D>>,
28 S: ThresholdSupervisor<
29 Index = View,
30 PublicKey = C::PublicKey,
31 Identity = V::Public,
32 Seed = V::Signature,
33 Polynomial = Vec<V::Public>,
34 Share = group::Share,
35 >,
36> {
37 context: E,
38
39 voter: voter::Actor<E, C, B, V, D, A, R, F, S>,
40 voter_mailbox: voter::Mailbox<V, D>,
41
42 batcher: batcher::Actor<E, C::PublicKey, B, V, D, F, S>,
43 batcher_mailbox: batcher::Mailbox<C::PublicKey, V, D>,
44
45 resolver: resolver::Actor<E, C::PublicKey, B, V, D, S>,
46 resolver_mailbox: resolver::Mailbox<V, D>,
47}
48
49impl<
50 E: Clock + GClock + Rng + CryptoRng + Spawner + Storage + Metrics,
51 C: Signer,
52 B: Blocker<PublicKey = C::PublicKey>,
53 V: Variant,
54 D: Digest,
55 A: Automaton<Context = Context<D>, Digest = D>,
56 R: Relay<Digest = D>,
57 F: Reporter<Activity = Activity<V, D>>,
58 S: ThresholdSupervisor<
59 Seed = V::Signature,
60 Index = View,
61 Share = group::Share,
62 Polynomial = Vec<V::Public>,
63 Identity = V::Public,
64 PublicKey = C::PublicKey,
65 >,
66 > Engine<E, C, B, V, D, A, R, F, S>
67{
68 pub fn new(context: E, cfg: Config<C, B, V, D, A, R, F, S>) -> Self {
70 cfg.assert();
72
73 let (batcher, batcher_mailbox) = batcher::Actor::new(
75 context.with_label("batcher"),
76 batcher::Config {
77 blocker: cfg.blocker.clone(),
78 reporter: cfg.reporter.clone(),
79 supervisor: cfg.supervisor.clone(),
80 namespace: cfg.namespace.clone(),
81 mailbox_size: cfg.mailbox_size,
82 activity_timeout: cfg.activity_timeout,
83 skip_timeout: cfg.skip_timeout,
84 },
85 );
86
87 let (voter, voter_mailbox) = voter::Actor::new(
89 context.with_label("voter"),
90 voter::Config {
91 crypto: cfg.crypto.clone(),
92 blocker: cfg.blocker.clone(),
93 automaton: cfg.automaton,
94 relay: cfg.relay,
95 reporter: cfg.reporter,
96 supervisor: cfg.supervisor.clone(),
97 partition: cfg.partition,
98 mailbox_size: cfg.mailbox_size,
99 namespace: cfg.namespace.clone(),
100 leader_timeout: cfg.leader_timeout,
101 notarization_timeout: cfg.notarization_timeout,
102 nullify_retry: cfg.nullify_retry,
103 activity_timeout: cfg.activity_timeout,
104 replay_buffer: cfg.replay_buffer,
105 write_buffer: cfg.write_buffer,
106 buffer_pool: cfg.buffer_pool,
107 },
108 );
109
110 let (resolver, resolver_mailbox) = resolver::Actor::new(
112 context.with_label("resolver"),
113 resolver::Config {
114 blocker: cfg.blocker,
115 crypto: cfg.crypto.public_key(),
116 supervisor: cfg.supervisor,
117 mailbox_size: cfg.mailbox_size,
118 namespace: cfg.namespace,
119 activity_timeout: cfg.activity_timeout,
120 fetch_timeout: cfg.fetch_timeout,
121 fetch_concurrent: cfg.fetch_concurrent,
122 max_fetch_count: cfg.max_fetch_count,
123 fetch_rate_per_peer: cfg.fetch_rate_per_peer,
124 },
125 );
126
127 Self {
129 context,
130
131 voter,
132 voter_mailbox,
133
134 batcher,
135 batcher_mailbox,
136
137 resolver,
138 resolver_mailbox,
139 }
140 }
141
142 pub fn start(
146 self,
147 pending_network: (
148 impl Sender<PublicKey = C::PublicKey>,
149 impl Receiver<PublicKey = C::PublicKey>,
150 ),
151 recovered_network: (
152 impl Sender<PublicKey = C::PublicKey>,
153 impl Receiver<PublicKey = C::PublicKey>,
154 ),
155 resolver_network: (
156 impl Sender<PublicKey = C::PublicKey>,
157 impl Receiver<PublicKey = C::PublicKey>,
158 ),
159 ) -> Handle<()> {
160 self.context
161 .clone()
162 .spawn(|_| self.run(pending_network, recovered_network, resolver_network))
163 }
164
165 async fn run(
166 self,
167 pending_network: (
168 impl Sender<PublicKey = C::PublicKey>,
169 impl Receiver<PublicKey = C::PublicKey>,
170 ),
171 recovered_network: (
172 impl Sender<PublicKey = C::PublicKey>,
173 impl Receiver<PublicKey = C::PublicKey>,
174 ),
175 resolver_network: (
176 impl Sender<PublicKey = C::PublicKey>,
177 impl Receiver<PublicKey = C::PublicKey>,
178 ),
179 ) {
180 let (pending_sender, pending_receiver) = pending_network;
182 let mut batcher_task = self
183 .batcher
184 .start(self.voter_mailbox.clone(), pending_receiver);
185
186 let (resolver_sender, resolver_receiver) = resolver_network;
188 let mut resolver_task =
189 self.resolver
190 .start(self.voter_mailbox, resolver_sender, resolver_receiver);
191
192 let (recovered_sender, recovered_receiver) = recovered_network;
194 let mut voter_task = self.voter.start(
195 self.batcher_mailbox,
196 self.resolver_mailbox,
197 pending_sender,
198 recovered_sender,
199 recovered_receiver,
200 );
201
202 select! {
204 _ = &mut voter_task => {
205 debug!("voter finished");
206 resolver_task.abort();
207 batcher_task.abort();
208 },
209 _ = &mut batcher_task => {
210 debug!("batcher finished");
211 voter_task.abort();
212 resolver_task.abort();
213 },
214 _ = &mut resolver_task => {
215 debug!("resolver finished");
216 voter_task.abort();
217 batcher_task.abort();
218 },
219 }
220 }
221}