use crate::{
application::{Block, EpochProvider, Provider},
orchestrator::{ingress::Message, Mailbox},
BLOCKS_PER_EPOCH,
};
use commonware_consensus::{
marshal::{core::Mailbox as MarshalMailbox, standard::Standard},
simplex::{self, elector::Config as Elector, scheme, types::Context, Plan},
types::{Epoch, Epocher, FixedEpocher, ViewDelta},
CertifiableAutomaton, Relay,
};
use commonware_cryptography::{
bls12381::primitives::variant::Variant, certificate::Scheme, Hasher, Signer,
};
use commonware_macros::select_loop;
use commonware_p2p::{
utils::mux::{Builder, MuxHandle, Muxer},
Blocker, Receiver, Sender,
};
use commonware_parallel::Strategy;
use commonware_runtime::{
buffer::paged::CacheRef, spawn_cell, telemetry::metrics::status::GaugeExt, BufferPooler, Clock,
ContextCell, Handle, Metrics, Network, Spawner, Storage,
};
use commonware_utils::{channel::mpsc, vec::NonEmptyVec, NZUsize, NZU16};
use prometheus_client::metrics::gauge::Gauge;
use rand_core::CryptoRngCore;
use std::{collections::BTreeMap, marker::PhantomData, time::Duration};
use tracing::{debug, info, warn};
pub struct Config<B, V, C, H, A, S, L, T>
where
B: Blocker<PublicKey = C::PublicKey>,
V: Variant,
C: Signer,
H: Hasher,
A: CertifiableAutomaton<Context = Context<H::Digest, C::PublicKey>, Digest = H::Digest>
+ Relay<Digest = H::Digest, PublicKey = C::PublicKey, Plan = Plan<C::PublicKey>>,
S: Scheme,
L: Elector<S>,
T: Strategy,
{
pub oracle: B,
pub application: A,
pub provider: Provider<S, C>,
pub marshal: MarshalMailbox<S, Standard<Block<H, C, V>>>,
pub strategy: T,
pub muxer_size: usize,
pub mailbox_size: usize,
pub partition_prefix: String,
pub _phantom: PhantomData<L>,
}
pub struct Actor<E, B, V, C, H, A, S, L, T>
where
E: BufferPooler + Spawner + Metrics + CryptoRngCore + Clock + Storage + Network,
B: Blocker<PublicKey = C::PublicKey>,
V: Variant,
C: Signer,
H: Hasher,
A: CertifiableAutomaton<Context = Context<H::Digest, C::PublicKey>, Digest = H::Digest>
+ Relay<Digest = H::Digest, PublicKey = C::PublicKey, Plan = Plan<C::PublicKey>>,
S: Scheme,
L: Elector<S>,
T: Strategy,
Provider<S, C>: EpochProvider<Variant = V, PublicKey = C::PublicKey, Scheme = S>,
{
context: ContextCell<E>,
mailbox: mpsc::Receiver<Message<V, C::PublicKey>>,
application: A,
oracle: B,
marshal: MarshalMailbox<S, Standard<Block<H, C, V>>>,
provider: Provider<S, C>,
strategy: T,
muxer_size: usize,
partition_prefix: String,
page_cache_ref: CacheRef,
latest_epoch: Gauge,
_phantom: PhantomData<L>,
}
impl<E, B, V, C, H, A, S, L, T> Actor<E, B, V, C, H, A, S, L, T>
where
E: BufferPooler + Spawner + Metrics + CryptoRngCore + Clock + Storage + Network,
B: Blocker<PublicKey = C::PublicKey>,
V: Variant,
C: Signer,
H: Hasher,
A: CertifiableAutomaton<Context = Context<H::Digest, C::PublicKey>, Digest = H::Digest>
+ Relay<Digest = H::Digest, PublicKey = C::PublicKey, Plan = Plan<C::PublicKey>>,
S: scheme::Scheme<H::Digest, PublicKey = C::PublicKey>,
L: Elector<S>,
T: Strategy,
Provider<S, C>: EpochProvider<Variant = V, PublicKey = C::PublicKey, Scheme = S>,
{
pub fn new(
context: E,
config: Config<B, V, C, H, A, S, L, T>,
) -> (Self, Mailbox<V, C::PublicKey>) {
let (sender, mailbox) = mpsc::channel(config.mailbox_size);
let page_cache_ref = CacheRef::from_pooler(&context, NZU16!(16_384), NZUsize!(10_000));
let latest_epoch = Gauge::default();
context.register("latest_epoch", "current epoch", latest_epoch.clone());
(
Self {
context: ContextCell::new(context),
mailbox,
application: config.application,
oracle: config.oracle,
marshal: config.marshal,
provider: config.provider,
strategy: config.strategy,
muxer_size: config.muxer_size,
partition_prefix: config.partition_prefix,
page_cache_ref,
latest_epoch,
_phantom: PhantomData,
},
Mailbox::new(sender),
)
}
pub fn start(
mut self,
votes: (
impl Sender<PublicKey = C::PublicKey>,
impl Receiver<PublicKey = C::PublicKey>,
),
certificates: (
impl Sender<PublicKey = C::PublicKey>,
impl Receiver<PublicKey = C::PublicKey>,
),
resolver: (
impl Sender<PublicKey = C::PublicKey>,
impl Receiver<PublicKey = C::PublicKey>,
),
) -> Handle<()> {
spawn_cell!(self.context, self.run(votes, certificates, resolver,).await)
}
async fn run(
mut self,
(vote_sender, vote_receiver): (
impl Sender<PublicKey = C::PublicKey>,
impl Receiver<PublicKey = C::PublicKey>,
),
(certificate_sender, certificate_receiver): (
impl Sender<PublicKey = C::PublicKey>,
impl Receiver<PublicKey = C::PublicKey>,
),
(resolver_sender, resolver_receiver): (
impl Sender<PublicKey = C::PublicKey>,
impl Receiver<PublicKey = C::PublicKey>,
),
) {
let (mux, mut vote_mux, mut vote_backup) = Muxer::builder(
self.context.with_label("vote_mux"),
vote_sender,
vote_receiver,
self.muxer_size,
)
.with_backup()
.build();
mux.start();
let (mux, mut certificate_mux) = Muxer::builder(
self.context.with_label("certificate_mux"),
certificate_sender,
certificate_receiver,
self.muxer_size,
)
.build();
mux.start();
let (mux, mut resolver_mux) = Muxer::new(
self.context.with_label("resolver_mux"),
resolver_sender,
resolver_receiver,
self.muxer_size,
);
mux.start();
let epocher = FixedEpocher::new(BLOCKS_PER_EPOCH);
let mut engines: BTreeMap<Epoch, (Handle<()>, ContextCell<E>)> = BTreeMap::new();
select_loop! {
self.context,
on_stopped => {
debug!("context shutdown, stopping orchestrator");
},
Some((their_epoch, (from, _))) = vote_backup.recv() else {
warn!("vote mux backup channel closed, shutting down orchestrator");
break;
} => {
let their_epoch = Epoch::new(their_epoch);
let Some(our_epoch) = engines.keys().last().copied() else {
debug!(%their_epoch, ?from, "received message from unregistered epoch with no known epochs");
continue;
};
if their_epoch <= our_epoch {
debug!(%their_epoch, %our_epoch, ?from, "received message from past epoch");
continue;
}
let boundary_height = epocher.last(our_epoch).expect("our epoch should exist");
debug!(
?from,
%their_epoch,
%our_epoch,
%boundary_height,
"received backup message from future epoch, ensuring boundary finalization"
);
self.marshal
.hint_finalized(boundary_height, NonEmptyVec::new(from))
.await;
},
Some(transition) = self.mailbox.recv() else {
warn!("mailbox closed, shutting down orchestrator");
break;
} => match transition {
Message::Enter(transition) => {
if engines.contains_key(&transition.epoch) {
warn!(epoch = %transition.epoch, "entered existing epoch");
continue;
}
let scheme = self.provider.scheme_for_epoch(&transition);
assert!(self.provider.register(transition.epoch, scheme.clone()));
let (handle, scope) = self
.enter_epoch(
transition.epoch,
scheme,
&mut vote_mux,
&mut certificate_mux,
&mut resolver_mux,
)
.await;
engines.insert(transition.epoch, (handle, scope));
let _ = self.latest_epoch.try_set(transition.epoch.get());
info!(epoch = %transition.epoch, "entered epoch");
}
Message::Exit(epoch) => {
let Some((handle, _scope)) = engines.remove(&epoch) else {
warn!(%epoch, "exited non-existent epoch");
continue;
};
handle.abort();
assert!(self.provider.unregister(&epoch));
info!(%epoch, "exited epoch");
}
},
}
}
async fn enter_epoch(
&mut self,
epoch: Epoch,
scheme: S,
vote_mux: &mut MuxHandle<
impl Sender<PublicKey = C::PublicKey>,
impl Receiver<PublicKey = C::PublicKey>,
>,
certificate_mux: &mut MuxHandle<
impl Sender<PublicKey = C::PublicKey>,
impl Receiver<PublicKey = C::PublicKey>,
>,
resolver_mux: &mut MuxHandle<
impl Sender<PublicKey = C::PublicKey>,
impl Receiver<PublicKey = C::PublicKey>,
>,
) -> (Handle<()>, ContextCell<E>) {
let elector = L::default();
let scope = self
.context
.with_label("consensus_engine")
.with_attribute("epoch", epoch)
.with_scope();
let engine = simplex::Engine::new(
scope.clone(),
simplex::Config {
scheme,
elector,
blocker: self.oracle.clone(),
automaton: self.application.clone(),
relay: self.application.clone(),
reporter: self.marshal.clone(),
partition: format!("{}_consensus_{}", self.partition_prefix, epoch),
mailbox_size: 1024,
epoch,
replay_buffer: NZUsize!(1024 * 1024),
write_buffer: NZUsize!(1024 * 1024),
leader_timeout: Duration::from_secs(1),
certification_timeout: Duration::from_secs(2),
timeout_retry: Duration::from_secs(10),
fetch_timeout: Duration::from_secs(1),
activity_timeout: ViewDelta::new(256),
skip_timeout: ViewDelta::new(10),
fetch_concurrent: 32,
page_cache: self.page_cache_ref.clone(),
strategy: self.strategy.clone(),
forwarding: simplex::ForwardingPolicy::Disabled,
},
);
let vote = vote_mux.register(epoch.get()).await.unwrap();
let certificate = certificate_mux.register(epoch.get()).await.unwrap();
let resolver = resolver_mux.register(epoch.get()).await.unwrap();
(engine.start(vote, certificate, resolver), scope)
}
}