prometric 0.2.2

Metric types to use with the prometric-derive crate.
Documentation
//! Enables a [`Summary`] to be represented as a prometheus Summary metric

use std::{collections::HashMap, marker::PhantomData, ops::Deref, sync::Arc};

use prometheus::{
    Opts,
    core::{Desc, Describer, Metric, MetricVecBuilder},
    proto as pp,
};

use crate::summary::traits::{Summary, SummaryMetric};

use super::traits::NonConcurrentSummaryProvider;

// from metrics_exporter_prometheus::PrometheusBuilder::new
pub const DEFAULT_QUANTILES: &[f64] = &[0.0, 0.5, 0.9, 0.95, 0.99, 0.999, 1.0];

/// Configuration options for [`GenericSummary`]
#[derive(Clone)]
pub struct SummaryOpts<O> {
    pub common_opts: Opts,
    /// Used to initialize the specific [`SummaryProvider`]
    pub summary_opts: O,

    /// Which quantiles to export
    pub quantiles: Vec<f64>,
}

// needed for MetricVecBuilder::P
impl<O> Describer for SummaryOpts<O> {
    fn describe(&self) -> prometheus::Result<Desc> {
        self.common_opts.describe()
    }
}

impl<O> SummaryOpts<O> {
    pub fn new<S1: Into<String>, S2: Into<String>>(name: S1, help: S2, summary: O) -> Self {
        Self {
            common_opts: Opts::new(name, help),
            summary_opts: summary,
            quantiles: Vec::from(DEFAULT_QUANTILES),
        }
    }

    /// See [`Opts::const_labels`]
    pub fn const_labels(mut self, const_labels: HashMap<String, String>) -> Self {
        self.common_opts = self.common_opts.const_labels(const_labels);
        self
    }

    /// See [`Opts::variable_labels`]
    pub fn variable_labels(mut self, variable_labels: Vec<String>) -> Self {
        self.common_opts = self.common_opts.variable_labels(variable_labels);
        self
    }

    /// Configure the quantiles to use when creating a prometheus protobuf summary
    pub fn quantiles<B: Into<Vec<f64>>>(self, quantiles: B) -> Self {
        Self { quantiles: quantiles.into(), ..self }
    }
}

/// Uses the configured [`SummaryProvider`] `P` to collect observations and compute quantiles
///
/// Main purpose is to wrap over the summary to convert it into a [`prometheus::proto::Summary`]
#[derive(Debug)]
pub struct GenericSummary<P> {
    label_pairs: Vec<pp::LabelPair>,
    quantiles: Vec<f64>,
    provider: P,
}

impl<P: NonConcurrentSummaryProvider> GenericSummary<P> {
    pub fn new<V: AsRef<str>>(
        opts: &SummaryOpts<P::Opts>,
        label_values: &[V],
    ) -> prometheus::Result<Self> {
        let desc = opts.common_opts.describe()?;
        let label_pairs = make_label_pairs(&desc, label_values)?;

        Ok(Self {
            label_pairs,
            quantiles: opts.quantiles.clone(),
            provider: P::new_provider(&opts.summary_opts),
        })
    }

    /// Make a snapshot of the current summary state exposed as a Protobuf struct
    pub fn proto(&self) -> pp::Summary {
        let snapshot = self.provider.snapshot();
        let mut summary = pp::Summary::default();

        summary.set_sample_sum(snapshot.sample_sum());
        summary.set_sample_count(snapshot.sample_count());

        let mut quantiles = Vec::with_capacity(self.quantiles.len());
        for quantile in self.quantiles.iter().cloned() {
            let mut q = pp::Quantile::default();
            q.set_quantile(quantile);

            // TODO: signal that this value was not computable if == None
            let Some(val) = snapshot.quantile(quantile) else { continue };
            q.set_value(val);

            quantiles.push(q);
        }

        summary.set_quantile(quantiles);

        summary
    }
}

impl<P> Deref for GenericSummary<P> {
    type Target = P;

    fn deref(&self) -> &Self::Target {
        &self.provider
    }
}

/// NewType over [`GenericSummaryImpl`]
///
/// Uses `Arc` to ensure clones refer to the same data.
/// This is becuase [ `prometheus::core::MetricVec` ] will clone the metric each time it is to be
/// accessed, even when inserting new data
#[derive(Clone)]
pub struct GenericSummaryMetric<P>(Arc<GenericSummary<P>>);

impl<P> Deref for GenericSummaryMetric<P> {
    type Target = GenericSummary<P>;

    fn deref(&self) -> &Self::Target {
        &self.0
    }
}

impl<P> From<GenericSummary<P>> for GenericSummaryMetric<P> {
    fn from(value: GenericSummary<P>) -> Self {
        Self(Arc::new(value))
    }
}

impl<P: SummaryMetric> Metric for GenericSummaryMetric<P> {
    fn metric(&self) -> pp::Metric {
        let mut m = pp::Metric::from_label(self.label_pairs.clone());
        m.set_summary(self.proto());
        m
    }
}

/// Similarly to [`::prometheus::HistogramVec`], but for Summaries.
pub struct SummaryVecBuilder<S> {
    _p: PhantomData<S>,
}

impl<S> Clone for SummaryVecBuilder<S> {
    fn clone(&self) -> Self {
        Self { _p: self._p }
    }
}

impl<P> SummaryVecBuilder<P> {
    pub fn new() -> Self {
        Self { _p: PhantomData }
    }
}

impl<S: SummaryMetric> MetricVecBuilder for SummaryVecBuilder<S> {
    // NOTE: [`prometheus::core::MetricVec`] clones this `M` whenever it is to be returned,
    // given a set of label.
    // Therefore, we want this clone to refer to the same instance of the data.
    type M = GenericSummaryMetric<S>;
    type P = SummaryOpts<S::Opts>;

    fn build<V: AsRef<str>>(&self, opts: &Self::P, vals: &[V]) -> prometheus::Result<Self::M> {
        GenericSummary::<S>::new(opts, vals).map(Into::into)
    }
}

// from prometheus::value::make_label_pairs
fn make_label_pairs<V: AsRef<str>>(
    desc: &Desc,
    label_values: &[V],
) -> prometheus::Result<Vec<pp::LabelPair>> {
    if desc.variable_labels.len() != label_values.len() {
        return Err(prometheus::Error::InconsistentCardinality {
            expect: desc.variable_labels.len(),
            got: label_values.len(),
        });
    }

    let total_len = desc.variable_labels.len() + desc.const_label_pairs.len();
    if total_len == 0 {
        return Ok(vec![]);
    }

    if desc.variable_labels.is_empty() {
        return Ok(desc.const_label_pairs.clone());
    }

    let mut label_pairs = Vec::with_capacity(total_len);
    for (i, n) in desc.variable_labels.iter().enumerate() {
        let mut label_pair = pp::LabelPair::default();
        label_pair.set_name(n.clone());
        label_pair.set_value(label_values[i].as_ref().to_owned());
        label_pairs.push(label_pair);
    }

    for label_pair in &desc.const_label_pairs {
        label_pairs.push(label_pair.clone());
    }
    label_pairs.sort();
    Ok(label_pairs)
}

#[cfg(test)]
mod tests {
    use crate::{
        batching::{BatchOpts, BatchedSummary},
        rolling::{RollingSummary, RollingSummaryOpts},
        simple::{SimpleSummary, SimpleSummaryOpts},
        traits::SummaryProvider,
    };

    use super::*;

    const MEASUREMENTS: usize = 50_000;
    const PRINT_EVERY: usize = 100;

    impl<P> GenericSummary<P> {
        pub fn inner_mut(&mut self) -> &mut P {
            &mut self.provider
        }
    }

    fn measure_mut<S: NonConcurrentSummaryProvider>(mut summary: GenericSummary<S>) {
        for i in 0..MEASUREMENTS {
            let start = std::time::Instant::now();
            summary.inner_mut().observe(i as f64);
            if i % PRINT_EVERY == 0 {
                println!("Time taken: {:?}", start.elapsed());
            }
        }

        let result = summary.snapshot();
        assert_eq!(
            result.sample_count(),
            MEASUREMENTS as u64,
            "Should have all measurements present in the collection"
        );
    }

    fn measure<S: SummaryProvider>(summary: GenericSummary<S>) {
        for i in 0..MEASUREMENTS {
            let start = std::time::Instant::now();
            summary.observe(i as f64);
            if i % PRINT_EVERY == 0 {
                println!("Time taken: {:?}", start.elapsed());
            }
        }

        let result = summary.snapshot();
        assert_eq!(
            result.sample_count(),
            MEASUREMENTS as u64,
            "Should have all measurements present in the collection"
        );
    }

    #[test]
    fn with_simple_summary() {
        let opts = SimpleSummaryOpts::default();
        let opts =
            SummaryOpts::new("test_summary", "simple", opts).quantiles(DEFAULT_QUANTILES.to_vec());
        let summary = GenericSummary::<SimpleSummary>::new::<&str>(&opts, &[]).unwrap();

        measure_mut(summary);
    }

    #[test]
    fn with_batched_simple_summary() {
        let opts = SimpleSummaryOpts::default();
        let opts = BatchOpts::from_inner(opts);
        let opts = SummaryOpts::new("test_summary", "batched_simple", opts)
            .quantiles(DEFAULT_QUANTILES.to_vec());
        let summary =
            GenericSummary::<BatchedSummary<SimpleSummary>>::new::<&str>(&opts, &[]).unwrap();

        measure(summary);
    }

    #[test]
    fn with_rolling_summary() {
        let opts = RollingSummaryOpts::default().with_quantiles(DEFAULT_QUANTILES);
        let opts =
            SummaryOpts::new("test_summary", "rolling", opts).quantiles(DEFAULT_QUANTILES.to_vec());
        let summary = GenericSummary::<RollingSummary>::new::<&str>(&opts, &[]).unwrap();

        measure_mut(summary);
    }

    #[test]
    fn with_batched_rolling_summary() {
        let opts = RollingSummaryOpts::default().with_quantiles(DEFAULT_QUANTILES);
        let opts = BatchOpts::from_inner(opts);
        let opts = SummaryOpts::new("test_summary", "batched_rolling", opts)
            .quantiles(DEFAULT_QUANTILES.to_vec());
        let summary =
            GenericSummary::<BatchedSummary<RollingSummary>>::new::<&str>(&opts, &[]).unwrap();

        measure(summary);
    }
}