commonware_consensus/simplex/
engine.rs1use super::{
2 actors::{resolver, voter},
3 config::Config,
4 types::{Activity, Context, View},
5};
6use crate::{Automaton, Relay, Reporter, Supervisor};
7use commonware_cryptography::{Digest, Signer};
8use commonware_macros::select;
9use commonware_p2p::{Receiver, Sender};
10use commonware_runtime::{Clock, Handle, Metrics, Spawner, Storage};
11use governor::clock::Clock as GClock;
12use rand::{CryptoRng, Rng};
13use tracing::debug;
14
15pub struct Engine<
17 E: Clock + GClock + Rng + CryptoRng + Spawner + Storage + Metrics,
18 C: Signer,
19 D: Digest,
20 A: Automaton<Context = Context<D>, Digest = D>,
21 R: Relay<Digest = D>,
22 F: Reporter<Activity = Activity<C::Signature, D>>,
23 S: Supervisor<Index = View, PublicKey = C::PublicKey>,
24> {
25 context: E,
26
27 voter: voter::Actor<E, C, D, A, R, F, S>,
28 voter_mailbox: voter::Mailbox<C::Signature, D>,
29 resolver: resolver::Actor<E, C::PublicKey, D, S>,
30 resolver_mailbox: resolver::Mailbox<C::Signature, D>,
31}
32
33impl<
34 E: Clock + GClock + Rng + CryptoRng + Spawner + Storage + Metrics,
35 C: Signer,
36 D: Digest,
37 A: Automaton<Context = Context<D>, Digest = D>,
38 R: Relay<Digest = D>,
39 F: Reporter<Activity = Activity<C::Signature, D>>,
40 S: Supervisor<Index = View, PublicKey = C::PublicKey>,
41 > Engine<E, C, D, A, R, F, S>
42{
43 pub fn new(context: E, cfg: Config<C, D, A, R, F, S>) -> Self {
45 cfg.assert();
47
48 let public_key = cfg.crypto.public_key();
50 let (voter, voter_mailbox) = voter::Actor::new(
51 context.with_label("voter"),
52 voter::Config {
53 crypto: cfg.crypto,
54 automaton: cfg.automaton,
55 relay: cfg.relay,
56 reporter: cfg.reporter,
57 supervisor: cfg.supervisor.clone(),
58 partition: cfg.partition,
59 compression: cfg.compression,
60 mailbox_size: cfg.mailbox_size,
61 namespace: cfg.namespace.clone(),
62 max_participants: cfg.max_participants,
63 leader_timeout: cfg.leader_timeout,
64 notarization_timeout: cfg.notarization_timeout,
65 nullify_retry: cfg.nullify_retry,
66 activity_timeout: cfg.activity_timeout,
67 skip_timeout: cfg.skip_timeout,
68 replay_buffer: cfg.replay_buffer,
69 write_buffer: cfg.write_buffer,
70 buffer_pool: cfg.buffer_pool,
71 },
72 );
73
74 let (resolver, resolver_mailbox) = resolver::Actor::new(
76 context.with_label("resolver"),
77 resolver::Config {
78 crypto: public_key,
79 supervisor: cfg.supervisor,
80 mailbox_size: cfg.mailbox_size,
81 namespace: cfg.namespace,
82 max_participants: cfg.max_participants,
83 activity_timeout: cfg.activity_timeout,
84 fetch_timeout: cfg.fetch_timeout,
85 fetch_concurrent: cfg.fetch_concurrent,
86 max_fetch_count: cfg.max_fetch_count,
87 fetch_rate_per_peer: cfg.fetch_rate_per_peer,
88 },
89 );
90
91 Self {
93 context,
94
95 voter,
96 voter_mailbox,
97 resolver,
98 resolver_mailbox,
99 }
100 }
101
102 pub fn start(
106 self,
107 voter_network: (
108 impl Sender<PublicKey = C::PublicKey>,
109 impl Receiver<PublicKey = C::PublicKey>,
110 ),
111 resolver_network: (
112 impl Sender<PublicKey = C::PublicKey>,
113 impl Receiver<PublicKey = C::PublicKey>,
114 ),
115 ) -> Handle<()> {
116 self.context
117 .clone()
118 .spawn(|_| self.run(voter_network, resolver_network))
119 }
120
121 async fn run(
122 self,
123 voter_network: (
124 impl Sender<PublicKey = C::PublicKey>,
125 impl Receiver<PublicKey = C::PublicKey>,
126 ),
127 resolver_network: (
128 impl Sender<PublicKey = C::PublicKey>,
129 impl Receiver<PublicKey = C::PublicKey>,
130 ),
131 ) {
132 let (voter_sender, voter_receiver) = voter_network;
134 let mut voter_task = self
135 .voter
136 .start(self.resolver_mailbox, voter_sender, voter_receiver);
137
138 let (resolver_sender, resolver_receiver) = resolver_network;
140 let mut resolver_task =
141 self.resolver
142 .start(self.voter_mailbox, resolver_sender, resolver_receiver);
143
144 select! {
146 _ = &mut voter_task => {
147 debug!("voter finished");
148 resolver_task.abort();
149 },
150 _ = &mut resolver_task => {
151 debug!("resolver finished");
152 voter_task.abort();
153 },
154 }
155 }
156}