#![deny(clippy::arithmetic_side_effects)]
use histogram::Config;
use metrique_core::CloseValue;
use metrique_writer::{Distribution, MetricFlags, MetricValue, Observation, Value, ValueWriter};
use ordered_float::OrderedFloat;
use smallvec::SmallVec;
use std::{borrow::Borrow, marker::PhantomData};
use crate::traits::AggregateValue;
pub trait AggregationStrategy {
fn record(&mut self, value: f64) {
self.record_many(value, 1);
}
fn record_many(&mut self, value: f64, count: u64);
fn drain(&mut self) -> Vec<Observation>;
}
pub trait SharedAggregationStrategy {
fn record(&self, value: f64) {
self.record_many(value, 1);
}
fn record_many(&self, value: f64, count: u64);
fn drain(&self) -> Vec<Observation>;
}
pub struct Histogram<T, S = ExponentialAggregationStrategy> {
strategy: S,
_value: PhantomData<T>,
}
impl<T, S: AggregationStrategy> Histogram<T, S> {
pub fn new(strategy: S) -> Self {
Self {
strategy,
_value: PhantomData,
}
}
pub fn add_value(&mut self, value: impl Borrow<T>)
where
T: MetricValue,
{
let value = value.borrow();
struct Capturer<'a, S>(&'a mut S);
impl<'b, S: AggregationStrategy> ValueWriter for Capturer<'b, S> {
fn string(self, _value: &str) {}
fn metric<'a>(
self,
distribution: impl IntoIterator<Item = Observation>,
_unit: metrique_writer::Unit,
_dimensions: impl IntoIterator<Item = (&'a str, &'a str)>,
_flags: MetricFlags<'_>,
) {
for obs in distribution {
match obs {
Observation::Unsigned(v) => self.0.record(v as f64),
Observation::Floating(v) => self.0.record(v),
Observation::Repeated { total, occurrences } => {
if occurrences > 0 {
let avg = total / occurrences as f64;
self.0.record_many(avg, occurrences);
}
}
_ => {}
}
}
}
fn error(self, _error: metrique_writer::ValidationError) {}
}
let capturer = Capturer(&mut self.strategy);
value.write(capturer);
}
}
impl<T, S: Default + AggregationStrategy> Default for Histogram<T, S> {
fn default() -> Self {
Self::new(S::default())
}
}
impl<T: MetricValue, S: AggregationStrategy> CloseValue for Histogram<T, S> {
type Closed = HistogramClosed<T>;
fn close(mut self) -> Self::Closed {
HistogramClosed {
observations: self.strategy.drain(),
_value: PhantomData,
}
}
}
pub struct SharedHistogram<T, S = AtomicExponentialAggregationStrategy> {
strategy: S,
_value: PhantomData<T>,
}
impl<T, S: Default> Default for SharedHistogram<T, S> {
fn default() -> Self {
Self {
strategy: Default::default(),
_value: Default::default(),
}
}
}
impl<T, S: SharedAggregationStrategy> SharedHistogram<T, S> {
pub fn new(strategy: S) -> Self {
Self {
strategy,
_value: PhantomData,
}
}
pub fn add_value(&self, value: T)
where
T: MetricValue,
{
struct Capturer<'a, S>(&'a S);
impl<'b, S: SharedAggregationStrategy> ValueWriter for Capturer<'b, S> {
fn string(self, _value: &str) {}
fn metric<'a>(
self,
distribution: impl IntoIterator<Item = Observation>,
_unit: metrique_writer::Unit,
_dimensions: impl IntoIterator<Item = (&'a str, &'a str)>,
_flags: MetricFlags<'_>,
) {
for obs in distribution {
match obs {
Observation::Unsigned(v) => self.0.record(v as f64),
Observation::Floating(v) => self.0.record(v),
Observation::Repeated { total, occurrences } => {
if occurrences > 0 {
let avg = total / occurrences as f64;
self.0.record_many(avg, occurrences);
}
}
_ => {}
}
}
}
fn error(self, _error: metrique_writer::ValidationError) {}
}
let capturer = Capturer(&self.strategy);
value.write(capturer);
}
}
impl<T: MetricValue, S: SharedAggregationStrategy> CloseValue for SharedHistogram<T, S> {
type Closed = HistogramClosed<T>;
fn close(self) -> Self::Closed {
HistogramClosed {
observations: self.strategy.drain(),
_value: PhantomData,
}
}
}
pub struct HistogramClosed<T> {
observations: Vec<Observation>,
_value: PhantomData<T>,
}
impl<T> Value for HistogramClosed<T>
where
T: MetricValue,
{
fn write(&self, writer: impl ValueWriter) {
use metrique_writer::unit::UnitTag;
writer.metric(
self.observations.iter().copied(),
T::Unit::UNIT,
[],
MetricFlags::upcast(&Distribution),
)
}
}
impl<T> MetricValue for HistogramClosed<T>
where
T: MetricValue,
{
type Unit = T::Unit;
}
const SCALING_FACTOR: f64 = (1 << 10) as f64;
fn scale_up(v: impl Into<f64>) -> f64 {
v.into() * SCALING_FACTOR
}
fn scale_down(v: impl Into<f64>) -> f64 {
v.into() / SCALING_FACTOR
}
pub struct ExponentialAggregationStrategy {
inner: histogram::Histogram,
}
impl ExponentialAggregationStrategy {
pub fn new() -> Self {
let config = default_histogram_config();
Self {
inner: histogram::Histogram::with_config(&config),
}
}
}
impl Default for ExponentialAggregationStrategy {
fn default() -> Self {
Self::new()
}
}
fn default_histogram_config() -> Config {
Config::new(4, 64).expect("known good")
}
impl AggregationStrategy for ExponentialAggregationStrategy {
fn record_many(&mut self, value: f64, count: u64) {
let value = scale_up(value);
self.inner
.add(value.min(u64::MAX as f64) as u64, count)
.ok();
}
fn drain(&mut self) -> Vec<Observation> {
let snapshot = std::mem::replace(
&mut self.inner,
histogram::Histogram::with_config(&default_histogram_config()),
);
snapshot
.iter()
.filter(|bucket| bucket.count() > 0)
.map(|bucket| {
let range = bucket.range();
let midpoint = range.start().midpoint(*range.end());
let midpoint = scale_down(midpoint as f64);
Observation::Repeated {
total: midpoint * bucket.count() as f64,
occurrences: bucket.count(),
}
})
.collect()
}
}
#[derive(Default)]
pub struct SortAndMerge<const N: usize = 32> {
values: SmallVec<[f64; N]>,
}
impl<const N: usize> SortAndMerge<N> {
pub fn new() -> Self {
Self {
values: SmallVec::new(),
}
}
}
impl<const N: usize> AggregationStrategy for SortAndMerge<N> {
fn record_many(&mut self, value: f64, count: u64) {
self.values
.extend(std::iter::repeat_n(value, count as usize));
}
fn drain(&mut self) -> Vec<Observation> {
self.values.sort_by_key(|v| OrderedFloat(*v));
let mut observations = Vec::new();
let mut iter = self.values.iter().copied().filter(|v| !v.is_nan());
if let Some(first) = iter.next() {
let mut current_value = first;
let mut current_count: u64 = 1;
for value in iter {
if value == current_value {
current_count = current_count.saturating_add(1);
} else {
observations.push(Observation::Repeated {
total: current_value * current_count as f64,
occurrences: current_count,
});
current_value = value;
current_count = 1;
}
}
observations.push(Observation::Repeated {
total: current_value * current_count as f64,
occurrences: current_count,
});
}
self.values.clear();
observations
}
}
pub struct AtomicExponentialAggregationStrategy {
inner: histogram::AtomicHistogram,
}
impl AtomicExponentialAggregationStrategy {
pub fn new() -> Self {
Self {
inner: histogram::AtomicHistogram::with_config(&default_histogram_config()),
}
}
}
impl Default for AtomicExponentialAggregationStrategy {
fn default() -> Self {
Self::new()
}
}
impl SharedAggregationStrategy for AtomicExponentialAggregationStrategy {
fn record_many(&self, value: f64, count: u64) {
let value = scale_up(value);
self.inner
.add(value.min(u64::MAX as f64) as u64, count)
.ok();
}
fn drain(&self) -> Vec<Observation> {
self.inner
.drain()
.iter()
.filter(|bucket| bucket.count() > 0)
.map(|bucket| {
let range = bucket.range();
let midpoint = range.start().midpoint(*range.end());
let midpoint = scale_down(midpoint as f64);
Observation::Repeated {
total: midpoint * bucket.count() as f64,
occurrences: bucket.count(),
}
})
.collect()
}
}
impl<T, S> AggregateValue<T> for Histogram<T, S>
where
T: MetricValue,
S: AggregationStrategy + Default,
{
type Aggregated = Histogram<T, S>;
fn insert(accum: &mut Self::Aggregated, value: T) {
accum.add_value(value);
}
}
impl<T, S> AggregateValue<HistogramClosed<T>> for Histogram<T, S>
where
T: MetricValue,
S: AggregationStrategy + Default,
{
type Aggregated = Histogram<T, S>;
fn insert(accum: &mut Self::Aggregated, value: HistogramClosed<T>) {
for obs in value.observations {
match obs {
Observation::Repeated { total, occurrences } => {
if occurrences > 0 {
accum
.strategy
.record_many(total / occurrences as f64, occurrences);
}
}
Observation::Unsigned(v) => accum.strategy.record(v as f64),
Observation::Floating(v) => accum.strategy.record(v),
_ => {}
}
}
}
}
#[cfg(test)]
mod tests {
use assert2::check;
use metrique_writer::Observation;
use crate::histogram::{
AggregationStrategy, AtomicExponentialAggregationStrategy, ExponentialAggregationStrategy,
SharedAggregationStrategy, default_histogram_config, scale_down, scale_up,
};
#[test]
fn test_histogram_max_values() {
let v = f64::MAX;
let mut strat = ExponentialAggregationStrategy::new();
strat.record(v);
check!(
strat.drain()
== vec![Observation::Repeated {
total: 1.7732923532771328e16,
occurrences: 1,
}]
);
}
#[test]
fn test_atomic_histogram_max_values() {
let v = f64::MAX;
let strat = AtomicExponentialAggregationStrategy::new();
strat.record(v);
check!(
strat.drain()
== vec![Observation::Repeated {
total: 1.7732923532771328e16,
occurrences: 1,
}]
);
}
#[test]
fn num_buckets() {
check!(default_histogram_config().total_buckets() == 976);
}
#[test]
fn test_scaling() {
let x = 0.001;
check!(scale_down(scale_up(x)) == x);
}
}