commonware_consensus/simplex/
engine.rs1use super::{
2 actors::{resolver, voter},
3 config::Config,
4 Context, View,
5};
6use crate::{Automaton, Committer, Relay, Supervisor};
7use commonware_cryptography::Scheme;
8use commonware_macros::select;
9use commonware_p2p::{Receiver, Sender};
10use commonware_runtime::{Blob, Clock, Handle, Metrics, Spawner, Storage};
11use commonware_storage::journal::variable::Journal;
12use commonware_utils::Array;
13use governor::clock::Clock as GClock;
14use rand::{CryptoRng, Rng};
15use tracing::debug;
16
17pub struct Engine<
19 B: Blob,
20 E: Clock + GClock + Rng + CryptoRng + Spawner + Storage<B> + Metrics,
21 C: Scheme,
22 D: Array,
23 A: Automaton<Context = Context<D>, Digest = D>,
24 R: Relay<Digest = D>,
25 F: Committer<Digest = D>,
26 S: Supervisor<Index = View, PublicKey = C::PublicKey>,
27> {
28 context: E,
29
30 voter: voter::Actor<B, E, C, D, A, R, F, S>,
31 voter_mailbox: voter::Mailbox<D>,
32 resolver: resolver::Actor<E, C, D, S>,
33 resolver_mailbox: resolver::Mailbox,
34}
35
36impl<
37 B: Blob,
38 E: Clock + GClock + Rng + CryptoRng + Spawner + Storage<B> + Metrics,
39 C: Scheme,
40 D: Array,
41 A: Automaton<Context = Context<D>, Digest = D>,
42 R: Relay<Digest = D>,
43 F: Committer<Digest = D>,
44 S: Supervisor<Index = View, PublicKey = C::PublicKey>,
45 > Engine<B, E, C, D, A, R, F, S>
46{
47 pub fn new(context: E, journal: Journal<B, E>, cfg: Config<C, D, A, R, F, S>) -> Self {
49 cfg.assert();
51
52 let (voter, voter_mailbox) = voter::Actor::new(
54 context.with_label("voter"),
55 journal,
56 voter::Config {
57 crypto: cfg.crypto.clone(),
58 automaton: cfg.automaton,
59 relay: cfg.relay,
60 committer: cfg.committer,
61 supervisor: cfg.supervisor.clone(),
62 mailbox_size: cfg.mailbox_size,
63 namespace: cfg.namespace.clone(),
64 leader_timeout: cfg.leader_timeout,
65 notarization_timeout: cfg.notarization_timeout,
66 nullify_retry: cfg.nullify_retry,
67 activity_timeout: cfg.activity_timeout,
68 replay_concurrency: cfg.replay_concurrency,
69 },
70 );
71
72 let (resolver, resolver_mailbox) = resolver::Actor::new(
74 context.with_label("resolver"),
75 resolver::Config {
76 crypto: cfg.crypto,
77 supervisor: cfg.supervisor,
78 mailbox_size: cfg.mailbox_size,
79 namespace: cfg.namespace,
80 activity_timeout: cfg.activity_timeout,
81 fetch_timeout: cfg.fetch_timeout,
82 fetch_concurrent: cfg.fetch_concurrent,
83 max_fetch_count: cfg.max_fetch_count,
84 max_fetch_size: cfg.max_fetch_size,
85 fetch_rate_per_peer: cfg.fetch_rate_per_peer,
86 },
87 );
88
89 Self {
91 context,
92
93 voter,
94 voter_mailbox,
95 resolver,
96 resolver_mailbox,
97 }
98 }
99
100 pub fn start(
104 self,
105 voter_network: (
106 impl Sender<PublicKey = C::PublicKey>,
107 impl Receiver<PublicKey = C::PublicKey>,
108 ),
109 resolver_network: (
110 impl Sender<PublicKey = C::PublicKey>,
111 impl Receiver<PublicKey = C::PublicKey>,
112 ),
113 ) -> Handle<()> {
114 self.context
115 .clone()
116 .spawn(|_| self.run(voter_network, resolver_network))
117 }
118
119 async fn run(
120 self,
121 voter_network: (
122 impl Sender<PublicKey = C::PublicKey>,
123 impl Receiver<PublicKey = C::PublicKey>,
124 ),
125 resolver_network: (
126 impl Sender<PublicKey = C::PublicKey>,
127 impl Receiver<PublicKey = C::PublicKey>,
128 ),
129 ) {
130 let (voter_sender, voter_receiver) = voter_network;
132 let mut voter_task = self
133 .voter
134 .start(self.resolver_mailbox, voter_sender, voter_receiver);
135
136 let (resolver_sender, resolver_receiver) = resolver_network;
138 let mut resolver_task =
139 self.resolver
140 .start(self.voter_mailbox, resolver_sender, resolver_receiver);
141
142 select! {
144 _ = &mut voter_task => {
145 debug!("voter finished");
146 resolver_task.abort();
147 },
148 _ = &mut resolver_task => {
149 debug!("resolver finished");
150 voter_task.abort();
151 },
152 }
153 }
154}