1use super::{
2 actors::{batcher, resolver, voter},
3 config::Config,
4 types::{Activity, Context, View},
5};
6use crate::{Automaton, Relay, Reporter, ThresholdSupervisor};
7use commonware_cryptography::{
8 bls12381::primitives::{group, variant::Variant},
9 Digest, Signer,
10};
11use commonware_macros::select;
12use commonware_p2p::{Blocker, Receiver, Sender};
13use commonware_runtime::{Clock, Handle, Metrics, Spawner, Storage};
14use governor::clock::Clock as GClock;
15use rand::{CryptoRng, Rng};
16use tracing::debug;
17
18pub struct Engine<
20 E: Clock + GClock + Rng + CryptoRng + Spawner + Storage + Metrics,
21 C: Signer,
22 B: Blocker<PublicKey = C::PublicKey>,
23 V: Variant,
24 D: Digest,
25 A: Automaton<Context = Context<D>, Digest = D>,
26 R: Relay<Digest = D>,
27 F: Reporter<Activity = Activity<V, D>>,
28 S: ThresholdSupervisor<
29 Index = View,
30 PublicKey = C::PublicKey,
31 Identity = V::Public,
32 Seed = V::Signature,
33 Polynomial = Vec<V::Public>,
34 Share = group::Share,
35 >,
36> {
37 context: E,
38
39 voter: voter::Actor<E, C, B, V, D, A, R, F, S>,
40 voter_mailbox: voter::Mailbox<V, D>,
41
42 batcher: batcher::Actor<E, C::PublicKey, B, V, D, F, S>,
43 batcher_mailbox: batcher::Mailbox<C::PublicKey, V, D>,
44
45 resolver: resolver::Actor<E, C::PublicKey, B, V, D, S>,
46 resolver_mailbox: resolver::Mailbox<V, D>,
47}
48
49impl<
50 E: Clock + GClock + Rng + CryptoRng + Spawner + Storage + Metrics,
51 C: Signer,
52 B: Blocker<PublicKey = C::PublicKey>,
53 V: Variant,
54 D: Digest,
55 A: Automaton<Context = Context<D>, Digest = D>,
56 R: Relay<Digest = D>,
57 F: Reporter<Activity = Activity<V, D>>,
58 S: ThresholdSupervisor<
59 Seed = V::Signature,
60 Index = View,
61 Share = group::Share,
62 Polynomial = Vec<V::Public>,
63 Identity = V::Public,
64 PublicKey = C::PublicKey,
65 >,
66 > Engine<E, C, B, V, D, A, R, F, S>
67{
68 pub fn new(context: E, cfg: Config<C, B, V, D, A, R, F, S>) -> Self {
70 cfg.assert();
72
73 let (batcher, batcher_mailbox) = batcher::Actor::new(
75 context.with_label("batcher"),
76 batcher::Config {
77 blocker: cfg.blocker.clone(),
78 reporter: cfg.reporter.clone(),
79 supervisor: cfg.supervisor.clone(),
80 namespace: cfg.namespace.clone(),
81 mailbox_size: cfg.mailbox_size,
82 activity_timeout: cfg.activity_timeout,
83 skip_timeout: cfg.skip_timeout,
84 },
85 );
86
87 let (voter, voter_mailbox) = voter::Actor::new(
89 context.with_label("voter"),
90 voter::Config {
91 crypto: cfg.crypto.clone(),
92 blocker: cfg.blocker.clone(),
93 automaton: cfg.automaton,
94 relay: cfg.relay,
95 reporter: cfg.reporter,
96 supervisor: cfg.supervisor.clone(),
97 partition: cfg.partition,
98 compression: cfg.compression,
99 mailbox_size: cfg.mailbox_size,
100 namespace: cfg.namespace.clone(),
101 leader_timeout: cfg.leader_timeout,
102 notarization_timeout: cfg.notarization_timeout,
103 nullify_retry: cfg.nullify_retry,
104 activity_timeout: cfg.activity_timeout,
105 replay_buffer: cfg.replay_buffer,
106 write_buffer: cfg.write_buffer,
107 buffer_pool: cfg.buffer_pool,
108 },
109 );
110
111 let (resolver, resolver_mailbox) = resolver::Actor::new(
113 context.with_label("resolver"),
114 resolver::Config {
115 blocker: cfg.blocker,
116 crypto: cfg.crypto.public_key(),
117 supervisor: cfg.supervisor,
118 mailbox_size: cfg.mailbox_size,
119 namespace: cfg.namespace,
120 activity_timeout: cfg.activity_timeout,
121 fetch_timeout: cfg.fetch_timeout,
122 fetch_concurrent: cfg.fetch_concurrent,
123 max_fetch_count: cfg.max_fetch_count,
124 fetch_rate_per_peer: cfg.fetch_rate_per_peer,
125 },
126 );
127
128 Self {
130 context,
131
132 voter,
133 voter_mailbox,
134
135 batcher,
136 batcher_mailbox,
137
138 resolver,
139 resolver_mailbox,
140 }
141 }
142
143 pub fn start(
147 self,
148 pending_network: (
149 impl Sender<PublicKey = C::PublicKey>,
150 impl Receiver<PublicKey = C::PublicKey>,
151 ),
152 recovered_network: (
153 impl Sender<PublicKey = C::PublicKey>,
154 impl Receiver<PublicKey = C::PublicKey>,
155 ),
156 resolver_network: (
157 impl Sender<PublicKey = C::PublicKey>,
158 impl Receiver<PublicKey = C::PublicKey>,
159 ),
160 ) -> Handle<()> {
161 self.context
162 .clone()
163 .spawn(|_| self.run(pending_network, recovered_network, resolver_network))
164 }
165
166 async fn run(
167 self,
168 pending_network: (
169 impl Sender<PublicKey = C::PublicKey>,
170 impl Receiver<PublicKey = C::PublicKey>,
171 ),
172 recovered_network: (
173 impl Sender<PublicKey = C::PublicKey>,
174 impl Receiver<PublicKey = C::PublicKey>,
175 ),
176 resolver_network: (
177 impl Sender<PublicKey = C::PublicKey>,
178 impl Receiver<PublicKey = C::PublicKey>,
179 ),
180 ) {
181 let (pending_sender, pending_receiver) = pending_network;
183 let mut batcher_task = self
184 .batcher
185 .start(self.voter_mailbox.clone(), pending_receiver);
186
187 let (resolver_sender, resolver_receiver) = resolver_network;
189 let mut resolver_task =
190 self.resolver
191 .start(self.voter_mailbox, resolver_sender, resolver_receiver);
192
193 let (recovered_sender, recovered_receiver) = recovered_network;
195 let mut voter_task = self.voter.start(
196 self.batcher_mailbox,
197 self.resolver_mailbox,
198 pending_sender,
199 recovered_sender,
200 recovered_receiver,
201 );
202
203 select! {
205 _ = &mut voter_task => {
206 debug!("voter finished");
207 resolver_task.abort();
208 batcher_task.abort();
209 },
210 _ = &mut batcher_task => {
211 debug!("batcher finished");
212 voter_task.abort();
213 resolver_task.abort();
214 },
215 _ = &mut resolver_task => {
216 debug!("resolver finished");
217 voter_task.abort();
218 batcher_task.abort();
219 },
220 }
221 }
222}