mod event_queue_metrics;
pub mod initializer;
pub mod initializer2;
pub mod joiner;
mod queue_kind;
pub mod validator;
use std::{
collections::HashMap,
env,
fmt::{Debug, Display},
fs::File,
mem,
str::FromStr,
sync::atomic::Ordering,
};
use datasize::DataSize;
use futures::{future::BoxFuture, FutureExt};
use once_cell::sync::Lazy;
use prometheus::{self, Histogram, HistogramOpts, IntCounter, Registry};
use quanta::IntoNanoseconds;
use tracing::{debug, debug_span, info, trace, warn};
use tracing_futures::Instrument;
use crate::{
effect::{Effect, EffectBuilder, Effects},
utils::{self, WeightedRoundRobin},
NodeRng,
};
use quanta::Clock;
pub use queue_kind::QueueKind;
use serde::Serialize;
use tokio::time::{Duration, Instant};
const DEFAULT_DISPATCH_EVENT_THRESHOLD: Duration = Duration::from_secs(1);
const DISPATCH_EVENT_THRESHOLD_ENV_VAR: &str = "CL_EVENT_MAX_MICROSECS";
static DISPATCH_EVENT_THRESHOLD: Lazy<Duration> = Lazy::new(|| {
env::var(DISPATCH_EVENT_THRESHOLD_ENV_VAR)
.map(|threshold_str| {
let threshold_microsecs = u64::from_str(&threshold_str).unwrap_or_else(|error| {
panic!(
"can't parse env var {}={} as a u64: {}",
DISPATCH_EVENT_THRESHOLD_ENV_VAR, threshold_str, error
)
});
Duration::from_micros(threshold_microsecs)
})
.unwrap_or_else(|_| DEFAULT_DISPATCH_EVENT_THRESHOLD)
});
pub type Scheduler<Ev> = WeightedRoundRobin<Ev, QueueKind>;
#[derive(DataSize, Debug)]
pub struct EventQueueHandle<REv>(&'static Scheduler<REv>)
where
REv: 'static;
impl<REv> Clone for EventQueueHandle<REv> {
fn clone(&self) -> Self {
EventQueueHandle(self.0)
}
}
impl<REv> Copy for EventQueueHandle<REv> {}
impl<REv> EventQueueHandle<REv> {
pub(crate) fn new(scheduler: &'static Scheduler<REv>) -> Self {
EventQueueHandle(scheduler)
}
#[inline]
pub(crate) async fn schedule<Ev>(self, event: Ev, queue_kind: QueueKind)
where
REv: From<Ev>,
{
self.0.push(event.into(), queue_kind).await
}
pub(crate) fn event_queues_counts(&self) -> HashMap<QueueKind, usize> {
self.0.event_queues_counts()
}
}
pub trait Reactor: Sized {
type Event: Send + Debug + Display + 'static;
type Config;
type Error: Send + 'static;
fn dispatch_event(
&mut self,
effect_builder: EffectBuilder<Self::Event>,
rng: &mut NodeRng,
event: Self::Event,
) -> Effects<Self::Event>;
fn new(
cfg: Self::Config,
registry: &Registry,
event_queue: EventQueueHandle<Self::Event>,
rng: &mut NodeRng,
) -> Result<(Self, Effects<Self::Event>), Self::Error>;
#[inline]
fn is_stopped(&mut self) -> bool {
false
}
fn update_metrics(&mut self, _event_queue_handle: EventQueueHandle<Self::Event>) {}
}
pub trait Finalize: Sized {
fn finalize(self) -> BoxFuture<'static, ()> {
async move {}.boxed()
}
}
#[derive(Debug)]
pub struct Runner<R>
where
R: Reactor,
{
scheduler: &'static Scheduler<R::Event>,
reactor: R,
event_count: usize,
last_metrics: Instant,
metrics: RunnerMetrics,
event_metrics_threshold: usize,
event_metrics_min_delay: Duration,
clock: Clock,
}
#[derive(Debug)]
struct RunnerMetrics {
events: IntCounter,
event_dispatch_duration: Histogram,
registry: Registry,
}
impl RunnerMetrics {
fn new(registry: &Registry) -> Result<Self, prometheus::Error> {
let events = IntCounter::new("runner_events", "total event count")?;
let event_dispatch_duration = Histogram::with_opts(
HistogramOpts::new(
"event_dispatch_duration",
"duration of complete dispatch of a single event in nanoseconds",
)
.buckets(vec![
100.0,
500.0,
1_000.0,
5_000.0,
10_000.0,
20_000.0,
50_000.0,
100_000.0,
200_000.0,
300_000.0,
400_000.0,
500_000.0,
600_000.0,
700_000.0,
800_000.0,
900_000.0,
1_000_000.0,
2_000_000.0,
5_000_000.0,
]),
)?;
registry.register(Box::new(events.clone()))?;
registry.register(Box::new(event_dispatch_duration.clone()))?;
Ok(RunnerMetrics {
events,
event_dispatch_duration,
registry: registry.clone(),
})
}
}
impl Drop for RunnerMetrics {
fn drop(&mut self) {
self.registry
.unregister(Box::new(self.events.clone()))
.expect("did not expect deregistering events to fail");
self.registry
.unregister(Box::new(self.event_dispatch_duration.clone()))
.expect("did not expect deregistering event_dispatch_duration to fail");
}
}
impl<R> Runner<R>
where
R: Reactor,
R::Event: Serialize,
R::Error: From<prometheus::Error>,
{
#[inline]
pub async fn new(cfg: R::Config, rng: &mut NodeRng) -> Result<Self, R::Error> {
let registry = Registry::new();
Self::with_metrics(cfg, rng, ®istry).await
}
#[inline]
pub async fn with_metrics(
cfg: R::Config,
rng: &mut NodeRng,
registry: &Registry,
) -> Result<Self, R::Error> {
let event_size = mem::size_of::<R::Event>();
if event_size > 16 * mem::size_of::<usize>() {
warn!(%event_size, "large event size, consider reducing it or boxing");
}
let scheduler = utils::leak(Scheduler::new(QueueKind::weights()));
let event_queue = EventQueueHandle::new(scheduler);
let (reactor, initial_effects) = R::new(cfg, registry, event_queue, rng)?;
let span = debug_span!("process initial effects");
process_effects(scheduler, initial_effects)
.instrument(span)
.await;
info!("reactor main loop is ready");
Ok(Runner {
scheduler,
reactor,
event_count: 0,
metrics: RunnerMetrics::new(registry)?,
last_metrics: Instant::now(),
event_metrics_min_delay: Duration::from_secs(30),
event_metrics_threshold: 1000,
clock: Clock::new(),
})
}
#[cfg(test)]
pub(crate) async fn process_injected_effects<F>(&mut self, create_effects: F)
where
F: FnOnce(EffectBuilder<R::Event>) -> Effects<R::Event>,
{
let event_queue = EventQueueHandle::new(self.scheduler);
let effect_builder = EffectBuilder::new(event_queue);
let effects = create_effects(effect_builder);
let effect_span = debug_span!("process injected effects", ev = self.event_count);
process_effects(self.scheduler, effects)
.instrument(effect_span)
.await;
}
#[inline]
pub async fn crank(&mut self, rng: &mut NodeRng) {
let crank_span = debug_span!("crank", ev = self.event_count);
let _inner_enter = crank_span.enter();
self.metrics.events.inc();
let event_queue = EventQueueHandle::new(self.scheduler);
let effect_builder = EffectBuilder::new(event_queue);
if self.event_count % self.event_metrics_threshold == 0 {
let now = Instant::now();
if now.duration_since(self.last_metrics) >= self.event_metrics_min_delay
|| self.event_count == 0
{
self.reactor.update_metrics(event_queue);
self.last_metrics = now;
}
}
if crate::QUEUE_DUMP_REQUESTED.load(Ordering::SeqCst) {
debug!("dumping event queue as requested");
let output_fn = "queue_dump.json";
let mut serializer = serde_json::Serializer::pretty(
File::create(output_fn).expect("could not create output file for queue snapshot"),
);
self.scheduler
.snapshot(&mut serializer)
.await
.expect("could not serialize snapshot");
let mut file =
File::create("queue_dump_debug.txt").expect("could not create dump file");
self.scheduler
.debug_dump(&mut file)
.await
.expect("unable to dump queues to file");
crate::QUEUE_DUMP_REQUESTED.store(false, Ordering::SeqCst);
}
let (event, q) = self.scheduler.pop().await;
let event_span = debug_span!("dispatch events", ev = self.event_count);
let inner_enter = event_span.enter();
let event_as_string = format!("{}", event);
debug!(event=%event_as_string, ?q);
trace!(?event, ?q);
let start = self.clock.start();
let effects = self.reactor.dispatch_event(effect_builder, rng, event);
let end = self.clock.end();
let delta = self.clock.delta(start, end);
if delta > *DISPATCH_EVENT_THRESHOLD {
warn!(
ns = delta.into_nanos(),
event = %event_as_string,
"event took very long to dispatch"
);
}
self.metrics
.event_dispatch_duration
.observe(delta.into_nanos() as f64);
drop(inner_enter);
let effect_span = debug_span!("process effects", ev = self.event_count);
process_effects(self.scheduler, effects)
.instrument(effect_span)
.await;
self.event_count += 1;
}
#[inline]
pub async fn try_crank(&mut self, rng: &mut NodeRng) -> Option<()> {
if self.scheduler.item_count() == 0 {
None
} else {
self.crank(rng).await;
Some(())
}
}
#[inline]
pub async fn run(&mut self, rng: &mut NodeRng) {
while !self.reactor.is_stopped() {
self.crank(rng).await;
}
}
#[inline]
pub fn reactor(&self) -> &R {
&self.reactor
}
#[inline]
pub fn reactor_mut(&mut self) -> &mut R {
&mut self.reactor
}
#[inline]
pub fn into_inner(self) -> R {
self.reactor
}
}
#[inline]
async fn process_effects<Ev>(scheduler: &'static Scheduler<Ev>, effects: Effects<Ev>)
where
Ev: Send + 'static,
{
let queue_kind = QueueKind::default();
for effect in effects {
tokio::spawn(async move {
for event in effect.await {
scheduler.push(event, queue_kind).await
}
});
}
}
#[inline]
fn wrap_effect<Ev, REv, F>(wrap: F, effect: Effect<Ev>) -> Effect<REv>
where
F: Fn(Ev) -> REv + Send + 'static,
Ev: Send + 'static,
REv: Send + 'static,
{
(async move {
let events = effect.await;
events.into_iter().map(wrap).collect()
})
.boxed()
}
#[inline]
pub fn wrap_effects<Ev, REv, F>(wrap: F, effects: Effects<Ev>) -> Effects<REv>
where
F: Fn(Ev) -> REv + Send + 'static + Clone,
Ev: Send + 'static,
REv: Send + 'static,
{
effects
.into_iter()
.map(move |effect| wrap_effect(wrap.clone(), effect))
.collect()
}