commonware_consensus/simplex/
engine.rsuse super::{
actors::{resolver, voter},
config::Config,
Context, View,
};
use crate::{Automaton, Committer, Relay, Supervisor};
use commonware_cryptography::{Hasher, Scheme};
use commonware_macros::select;
use commonware_p2p::{Receiver, Sender};
use commonware_runtime::{Blob, Clock, Spawner, Storage};
use commonware_storage::journal::Journal;
use governor::clock::Clock as GClock;
use rand::{CryptoRng, Rng};
use tracing::debug;
pub struct Engine<
B: Blob,
E: Clock + GClock + Rng + CryptoRng + Spawner + Storage<B>,
C: Scheme,
H: Hasher,
A: Automaton<Context = Context>,
R: Relay,
F: Committer,
S: Supervisor<Seed = (), Index = View>,
> {
runtime: E,
voter: voter::Actor<B, E, C, H, A, R, F, S>,
voter_mailbox: voter::Mailbox,
resolver: resolver::Actor<E, C, H, S>,
resolver_mailbox: resolver::Mailbox,
}
impl<
B: Blob,
E: Clock + GClock + Rng + CryptoRng + Spawner + Storage<B>,
C: Scheme,
H: Hasher,
A: Automaton<Context = Context>,
R: Relay,
F: Committer,
S: Supervisor<Seed = (), Index = View>,
> Engine<B, E, C, H, A, R, F, S>
{
pub fn new(runtime: E, journal: Journal<B, E>, cfg: Config<C, H, A, R, F, S>) -> Self {
cfg.assert();
let (voter, voter_mailbox) = voter::Actor::new(
runtime.clone(),
journal,
voter::Config {
crypto: cfg.crypto.clone(),
hasher: cfg.hasher,
automaton: cfg.automaton,
relay: cfg.relay,
committer: cfg.committer,
supervisor: cfg.supervisor.clone(),
registry: cfg.registry.clone(),
mailbox_size: cfg.mailbox_size,
namespace: cfg.namespace.clone(),
leader_timeout: cfg.leader_timeout,
notarization_timeout: cfg.notarization_timeout,
nullify_retry: cfg.nullify_retry,
activity_timeout: cfg.activity_timeout,
replay_concurrency: cfg.replay_concurrency,
},
);
let (resolver, resolver_mailbox) = resolver::Actor::new(
runtime.clone(),
resolver::Config {
crypto: cfg.crypto,
supervisor: cfg.supervisor,
registry: cfg.registry,
mailbox_size: cfg.mailbox_size,
namespace: cfg.namespace,
activity_timeout: cfg.activity_timeout,
fetch_timeout: cfg.fetch_timeout,
fetch_concurrent: cfg.fetch_concurrent,
max_fetch_count: cfg.max_fetch_count,
max_fetch_size: cfg.max_fetch_size,
fetch_rate_per_peer: cfg.fetch_rate_per_peer,
},
);
Self {
runtime,
voter,
voter_mailbox,
resolver,
resolver_mailbox,
}
}
pub async fn run(
self,
voter_network: (impl Sender, impl Receiver),
resolver_network: (impl Sender, impl Receiver),
) {
let (voter_sender, voter_receiver) = voter_network;
let mut voter = self.runtime.spawn("voter", async move {
self.voter
.run(self.resolver_mailbox, voter_sender, voter_receiver)
.await;
});
let (resolver_sender, resolver_receiver) = resolver_network;
let mut resolver = self.runtime.spawn("resolver", async move {
self.resolver
.run(self.voter_mailbox, resolver_sender, resolver_receiver)
.await;
});
select! {
_ = &mut voter => {
debug!("voter finished");
resolver.abort();
},
_ = &mut resolver => {
debug!("resolver finished");
voter.abort();
},
}
}
}