use std::sync::Arc;
use fxhash::FxHashMap;
use metrics::{Key, Unit};
use sketches_ddsketch::{Config as DDSketchConfig, DDSketch};
use tracing::warn;
use elfo_core::{message, ActorMeta, Local};
use crate::stats::SnapshotStats;
#[message(ret = Rendered)]
pub(crate) struct Render;
#[message]
pub(crate) struct Rendered(#[serde(serialize_with = "elfo_core::dumping::hide")] pub(crate) String);
#[message]
pub(crate) struct ServerFailed(pub(crate) String);
#[message(ret = Local<Arc<Snapshot>>)]
#[non_exhaustive]
pub(crate) struct GetSnapshot;
pub(crate) type GaugeEpoch = u64;
pub(crate) struct Description {
pub(crate) details: Option<&'static str>,
pub(crate) unit: Option<Unit>,
}
#[derive(Default, Clone)]
pub struct Snapshot {
pub global: Metrics,
pub groupwise: FxHashMap<String, Metrics>,
pub actorwise: FxHashMap<Arc<ActorMeta>, Metrics>,
}
impl Snapshot {
pub(crate) fn reset_distributions(&mut self) {
let global = self.global.histograms.values_mut();
let groupwise = self
.groupwise
.values_mut()
.flat_map(|m| m.histograms.values_mut());
let actorwise = self
.actorwise
.values_mut()
.flat_map(|m| m.histograms.values_mut());
for d in global.chain(groupwise).chain(actorwise) {
d.reset();
}
}
pub(crate) fn emit_stats(&self) {
let mut stats = SnapshotStats::new::<Self>();
stats.add_registry(&self.groupwise);
stats.add_registry(&self.actorwise);
std::iter::once(&self.global)
.chain(self.groupwise.values())
.chain(self.actorwise.values())
.for_each(|metrics| {
stats.add_registry(&metrics.counters);
stats.add_registry(&metrics.gauges);
stats.add_registry(&metrics.histograms);
metrics.histograms.values().for_each(|d| {
stats.add_additional_size(d.sketch_size());
});
});
stats.emit();
}
}
#[derive(Default, Clone)]
pub struct Metrics {
pub counters: FxHashMap<Key, u64>,
pub gauges: FxHashMap<Key, (f64, GaugeEpoch)>,
pub histograms: FxHashMap<Key, Distribution>,
}
#[derive(Clone)]
pub struct Distribution {
sketch: Arc<DDSketch>,
cumulative_sum: f64,
cumulative_count: usize,
}
impl Default for Distribution {
fn default() -> Self {
Self {
sketch: make_ddsketch(),
cumulative_sum: 0.0,
cumulative_count: 0,
}
}
}
impl Distribution {
#[inline]
pub fn quantile(&self, q: f64) -> Option<f64> {
self.sketch.quantile(q).unwrap_or_else(|err| {
warn!(error = %err, "failed to calculate a quantile");
None
})
}
#[inline]
pub fn min(&self) -> Option<f64> {
self.sketch.min()
}
#[inline]
pub fn max(&self) -> Option<f64> {
self.sketch.max()
}
#[inline]
pub fn cumulative_count(&self) -> usize {
self.cumulative_count + self.sketch.count()
}
#[inline]
pub fn cumulative_sum(&self) -> f64 {
self.cumulative_sum + self.sketch.sum().unwrap_or_default()
}
pub(crate) fn add(&mut self, samples: &[f64]) {
let sketch = Arc::make_mut(&mut self.sketch);
samples
.iter()
.filter(|v| f64::is_finite(**v))
.for_each(|v| sketch.add(*v));
}
fn reset(&mut self) {
self.cumulative_sum += self.sketch.sum().unwrap_or_default();
self.cumulative_count += self.sketch.count();
self.sketch = make_ddsketch();
}
fn sketch_size(&self) -> usize {
std::mem::size_of::<DDSketch>() + 8 * self.sketch.length()
}
}
fn make_ddsketch() -> Arc<DDSketch> {
let max_error = 0.01; let max_bins = 8192; let min_value = 1e-9;
let config = DDSketchConfig::new(max_error, max_bins, min_value);
Arc::new(DDSketch::new(config))
}