metrix 0.13.13

metrics for application monitoring
Documentation
use std::cell::RefCell;

use crate::instruments::{
    fundamentals::buckets::SecondsBuckets, AcceptAllLabels, Instrument, LabelFilter,
    LabelPredicate, Update, Updates,
};
use crate::snapshot::Snapshot;
use crate::util;
use crate::{Descriptive, ObservedValue, PutsSnapshot, TimeUnit};
pub use gauge_adapter::*;
use tracking::*;

mod gauge_adapter;
mod tracking;

/// Simply returns the value that has been observed last.
///
/// Reacts  `Observation::Observation::ObservedOneValue`
/// with the following values:
/// * `ObservedValue::ChangedBy` whoich increments or decrements the value
/// * All `ObservedValue`s tha can be converted to an `i64` which
/// directly set the value
///
/// # Examples
///
/// ```
/// # use std::time::Instant;
/// # use metrix::instruments::*;
///
/// let mut gauge = Gauge::new_with_defaults("example");
/// assert_eq!(None, gauge.get());
/// let update = Update::ObservationWithValue(12.into(), Instant::now());
/// gauge.update(&update);
///
/// assert_eq!(Some(12), gauge.get());
/// ```
///
/// ```
/// # use std::time::Instant;
/// # use metrix::instruments::*;
/// use metrix::{ChangeBy, Increment, Decrement};
///
/// let mut gauge = Gauge::new_with_defaults("example");
/// gauge.set(12.into());
/// assert_eq!(Some(12), gauge.get());
///
/// let update = Update::ObservationWithValue(Increment.into(), Instant::now());
/// gauge.update(&update);
/// assert_eq!(Some(13), gauge.get());
///
/// let update = Update::ObservationWithValue(Decrement.into(), Instant::now());
/// gauge.update(&update);
/// assert_eq!(Some(12), gauge.get());
///
/// let update = Update::ObservationWithValue(ChangeBy(-12).into(), Instant::now());
/// gauge.update(&update);
/// assert_eq!(Some(0), gauge.get());
/// ```
pub struct Gauge {
    name: String,
    title: Option<String>,
    description: Option<String>,
    value: Option<i64>,
    tracking: Option<RefCell<SecondsBuckets<Bucket>>>,
    display_time_unit: TimeUnit,
    group_values: bool,
}

impl Gauge {
    pub fn new<T: Into<String>>(name: T) -> Gauge {
        Gauge {
            name: name.into(),
            title: None,
            description: None,
            value: None,
            tracking: None,
            display_time_unit: TimeUnit::default(),
            group_values: false,
        }
    }

    pub fn new_with_defaults<T: Into<String>>(name: T) -> Gauge {
        Self::new(name)
    }

    pub fn get_name(&self) -> &str {
        &self.name
    }

    pub fn set_name<T: Into<String>>(&mut self, name: T) {
        self.name = name.into();
    }

    pub fn name<T: Into<String>>(mut self, name: T) -> Self {
        self.set_name(name);
        self
    }

    /// Sets the initial value and returns `self`
    pub fn value<V: Into<i64>>(mut self, value: V) -> Self {
        let value: i64 = value.into();
        self.set(ObservedValue::SignedInteger(value));
        self
    }

    /// If set to true, this instrument will form
    /// a group with its name.
    ///
    /// The current value will be named `current` and other values
    /// will not be prefixed with the name of the instrument
    pub fn group_values(mut self, group_values: bool) -> Self {
        self.group_values = group_values;
        self
    }

    pub fn set_title<T: Into<String>>(&mut self, title: T) {
        self.title = Some(title.into())
    }

    pub fn title<T: Into<String>>(mut self, title: T) -> Self {
        self.set_title(title);
        self
    }

    pub fn set_description<T: Into<String>>(&mut self, description: T) {
        self.description = Some(description.into())
    }

    pub fn description<T: Into<String>>(mut self, description: T) -> Self {
        self.set_description(description);
        self
    }

    /// Enables tracking of value for the last `for_seconds` seconds.
    /// For each interval of a second there will be a record that tracks
    /// the minimum and maximum values of the gauge within an interval.
    /// Also a sum and a counter to calculate averages will be recorded.
    ///
    /// If tracking is enabled, the following fields will be added:
    ///
    /// * `[gauge_name]_peak`: The peak value of all records
    /// * `[gauge_name]_peak_min`: The smallest of the peak values of all records
    /// * `[gauge_name]_peak_avg`: The average of the peak values for all records
    /// * `[gauge_name]_bottom`: The bottom value of all records
    /// * `[gauge_name]_bottom_max`: The biggest bottom value of all records
    /// * `[gauge_name]_bottom_avg`: The average of all bottom values for all records
    /// * `[gauge_name]_avg`: The average of all values for all records
    pub fn tracking(mut self, for_seconds: usize) -> Self {
        self.set_tracking(for_seconds);
        self
    }

    /// Enables tracking of value for the last `for_seconds` seconds.
    /// For each interval of a second there will be a record that tracks
    /// the minimum and maximum values of the gauge within an interval.
    /// Also a sum and a counter to calculate averages will be recorded.
    ///
    /// If tracking is enabled, the following fields will be added:
    ///
    /// * `[gauge_name]_peak`: The peak value of all records
    /// * `[gauge_name]_peak_min`: The smallest of the peak values of all records
    /// * `[gauge_name]_peak_avg`: The average of the peak values for all records
    /// * `[gauge_name]_bottom`: The bottom value of all records
    /// * `[gauge_name]_bottom_max`: The biggest bottom value of all records
    /// * `[gauge_name]_bottom_avg`: The average of all bottom values for all records
    /// * `[gauge_name]_avg`: The average of all values for all records
    pub fn set_tracking(&mut self, for_seconds: usize) {
        if for_seconds != 0 {
            self.tracking = Some(RefCell::new(SecondsBuckets::new(for_seconds)))
        }
    }

    pub fn set_display_time_unit(&mut self, display_time_unit: TimeUnit) {
        self.display_time_unit = display_time_unit
    }

    pub fn display_time_unit(mut self, display_time_unit: TimeUnit) -> Self {
        self.set_display_time_unit(display_time_unit);
        self
    }

    pub fn accept<L: Eq + Send + 'static, F: Into<LabelFilter<L>>>(
        self,
        accept: F,
    ) -> GaugeAdapter<L> {
        GaugeAdapter::accept(accept, self)
    }

    /// Creates an `GaugeAdapter` that makes this instrument
    /// react on observations on the given label.
    pub fn for_label<L: Eq + Send + 'static>(self, label: L) -> GaugeAdapter<L> {
        self.accept(label)
    }

    /// Creates an `GaugeAdapter` that makes this instrument
    /// react on observations with the given labels.
    ///
    /// If `labels` is empty the instrument will not react to any observations
    pub fn for_labels<L: Eq + Send + 'static>(self, labels: Vec<L>) -> GaugeAdapter<L> {
        self.accept(labels)
    }

    /// Creates an `GaugeAdapter` that makes this instrument react on
    /// all observations.
    pub fn for_all_labels<L: Eq + Send + 'static>(self) -> GaugeAdapter<L> {
        self.accept(AcceptAllLabels)
    }

    /// Creates a `GaugeAdapter` that makes this instrument react on
    /// observations with labels specified by the predicate.
    pub fn for_labels_by_predicate<L, P>(self, label_predicate: P) -> GaugeAdapter<L>
    where
        L: Eq + Send + 'static,
        P: Fn(&L) -> bool + Send + 'static,
    {
        self.accept(LabelPredicate(label_predicate))
    }

    /// Creates an `GaugeAdapter` that makes this instrument to rect to no
    /// observations.
    pub fn adapter<L: Eq + Send + 'static>(self) -> GaugeAdapter<L> {
        GaugeAdapter::deaf(self)
    }

    pub fn deltas_only<L: Eq + Send + 'static, F: Into<LabelFilter<L>>>(
        self,
        accept: F,
    ) -> GaugeAdapter<L> {
        GaugeAdapter::deltas_only(accept, self)
    }

    /// Creates a new adapter which dispatches observations
    /// with the given label to the `Gauge`
    ///
    /// The Gauge will only be dispatched message that increment or
    /// decrement the value
    pub fn for_label_deltas_only<L: Eq + Send + 'static>(self, label: L) -> GaugeAdapter<L> {
        self.deltas_only(label)
    }

    /// Creates a new adapter which dispatches observations
    /// with the given labels to the `Gauge`
    ///
    /// If `labels` is empty, no observations will be dispatched
    ///
    /// The Gauge will only be dispatched message that increment or
    /// decrement the value
    pub fn for_labels_deltas_only<L: Eq + Send + 'static>(self, labels: Vec<L>) -> GaugeAdapter<L> {
        self.deltas_only(labels)
    }

    pub fn for_labels_deltas_only_by_predicate<L, P>(self, label_predicate: P) -> GaugeAdapter<L>
    where
        L: Eq + Send + 'static,
        P: Fn(&L) -> bool + Send + 'static,
    {
        self.deltas_only(LabelPredicate(label_predicate))
    }

    /// Creates a new adapter which dispatches observations
    /// with the given labels to the `Gauge` which only increment or
    /// decrement the `Gauge`
    ///
    /// The `Gauge` will increment on any observation with label `increment_on`
    /// and decrement for any observation with label `decrement_on`.
    ///
    /// `increment_on` is evaluated first so
    /// `increment_on` and `decrement_on` should not be the same label.
    pub fn inc_dec_on<
        L: Eq + Send + 'static,
        INCR: Into<LabelFilter<L>>,
        DECR: Into<LabelFilter<L>>,
    >(
        self,
        accept_incr: INCR,
        accept_decr: DECR,
    ) -> GaugeAdapter<L> {
        GaugeAdapter::inc_dec_on(accept_incr, accept_decr, self)
    }

    /// Creates a new adapter which dispatches observations
    /// with the given labels to the `Gauge` which only increment or
    /// decrement the `Gauge`
    ///
    /// The `Gauge` will increment on any observation with a label in `increment_on`
    /// and decrement for any observation with a label in `decrement_on`.
    ///
    /// `increment_on` is evaluated first so
    /// `increment_on` and `decrement_on` should share labels.
    pub fn inc_dec_on_many<L: Eq + Send + 'static>(
        self,
        increment_on: Vec<L>,
        decrement_on: Vec<L>,
    ) -> GaugeAdapter<L> {
        self.inc_dec_on(increment_on, decrement_on)
    }

    pub fn inc_dec_by_predicates<L, PINC, PDEC>(
        self,
        predicate_incr: PINC,
        predicate_decr: PDEC,
    ) -> GaugeAdapter<L>
    where
        L: Eq + Send + 'static,
        PINC: Fn(&L) -> bool + Send + 'static,
        PDEC: Fn(&L) -> bool + Send + 'static,
    {
        self.inc_dec_on(
            LabelPredicate(predicate_incr),
            LabelPredicate(predicate_decr),
        )
    }

    pub fn set(&mut self, observed: ObservedValue) {
        if let Some(value) = self.value.take() {
            let next_value = if let Some(next_value) = self.next_value(Some(value), observed) {
                if let Some(ref buckets) = self.tracking {
                    match buckets.try_borrow_mut() {
                        Ok(mut borrowed) => borrowed.current_mut().update(next_value),
                        Err(_err) => crate::util::log_error("borrow mut in gauge::set failed!"),
                    }
                }
                next_value
            } else {
                value
            };

            self.value = Some(next_value);
        } else {
            self.value = self.next_value(None, observed).map(|next_value| {
                if let Some(ref buckets) = self.tracking {
                    match buckets.try_borrow_mut() {
                        Ok(mut borrowed) => borrowed.current_mut().update(next_value),
                        Err(_err) => crate::util::log_error("borrow mut in gauge::set failed!"),
                    }
                }

                next_value
            });
        }
    }

    pub fn get(&self) -> Option<i64> {
        self.value
    }

    fn next_value(&self, current: Option<i64>, observed: ObservedValue) -> Option<i64> {
        match observed {
            ObservedValue::ChangedBy(d) => current.map(|c| c + d).or_else(|| Some(d)),
            ObservedValue::Duration(time, unit) => {
                let value = super::duration_to_display_value(time, unit, self.display_time_unit);
                Some(value as i64)
            }
            x => x.convert_to_i64().or_else(|| current),
        }
    }

    fn put_values_into_snapshot(&self, into: &mut Snapshot) {
        if let Some(value) = self.value {
            let prefix = if self.group_values {
                into.items.push(("current".into(), value.into()));
                None
            } else {
                into.items.push((self.name.clone(), value.into()));
                Some(&*self.name)
            };
            if let Some(ref buckets) = self.tracking {
                match buckets.try_borrow_mut() {
                    Ok(mut borrowed) => BucketsStats::from_buckets(&mut *borrowed, Some(value))
                        .into_iter()
                        .for_each(|stats| stats.add_to_snapshot(into, prefix)),
                    Err(_err) => {
                        crate::util::log_error("borrow mut in gauge::put_snapshot failed!")
                    }
                }
            }
        }
    }
}

impl Instrument for Gauge {}

impl PutsSnapshot for Gauge {
    fn put_snapshot(&self, into: &mut Snapshot, descriptive: bool) {
        util::put_postfixed_descriptives(self, &self.name, into, descriptive);
        if self.group_values {
            let mut new_level = Snapshot::default();
            self.put_values_into_snapshot(&mut new_level);
            into.push(self.name.to_string(), new_level);
        } else {
            self.put_values_into_snapshot(into)
        };
    }
}

impl Updates for Gauge {
    fn update(&mut self, with: &Update) -> usize {
        match *with {
            Update::ObservationWithValue(v, _) => {
                self.set(v);
                1
            }
            _ => 0,
        }
    }
}

impl Descriptive for Gauge {
    fn title(&self) -> Option<&str> {
        self.title.as_deref()
    }

    fn description(&self) -> Option<&str> {
        self.description.as_deref()
    }
}

#[cfg(test)]
mod test;