1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
//! Publishes metrics values from a source to a sink.
//!
//! Publishing can be done on request:
//! ```
//! use dipstick::*;
//!
//! let (sink, source) = aggregate();
//! publish(&source, &log("aggregated"), publish::all_stats);
//! ```
//!
//! Publishing can be scheduled to run recurrently.
//! ```
//! use dipstick::*;
//! use std::time::Duration;
//!
//! let (sink, source) = aggregate();
//! let job = publish_every(Duration::from_millis(100), &source, &log("aggregated"), publish::all_stats);
//! // publish will go on until cancelled
//! job.cancel();
//! ```

use core::*;
use core::Kind::*;
use scores::{ScoreSnapshot, ScoreType};
use scores::ScoreType::*;
use std::fmt::Debug;

/// A trait to publish metrics.
pub trait Publish: Send + Sync + Debug {
    /// Publish the provided metrics data downstream.
    fn publish(&self, scores: Vec<ScoreSnapshot>);
}

/// Define and write metrics from aggregated scores to the target channel
/// If this is called repeatedly it can be a good idea to use the metric cache
/// to prevent new metrics from being created every time.
#[derive(Derivative, Clone)]
#[derivative(Debug)]
pub struct Publisher<E, M> {
    #[derivative(Debug = "ignore")] statistics: Box<E>,
    target_chain: Chain<M>,
}

impl<E, M> Publisher<E, M>
where
    E: Fn(Kind, &str, ScoreType) -> Option<(Kind, Vec<&str>, Value)> + Send + Sync + 'static,
    M: Clone + Send + Sync + 'static,
{
    /// Define a new metrics publishing strategy, from a transformation
    /// function and a target metric chain.
    pub fn new(stat_fn: E, target_chain: Chain<M>) -> Self {
        Publisher {
            statistics: Box::new(stat_fn),
            target_chain,
        }
    }
}

impl<E, M> Publish for Publisher<E, M>
where
    M: Clone + Send + Sync + Debug + 'static,
    E: Fn(Kind, &str, ScoreType) -> Option<(Kind, Vec<&str>, Value)> + Send + Sync + 'static,
{
    fn publish(&self, snapshot: Vec<ScoreSnapshot>) {
        let publish_scope_fn = self.target_chain.open_scope(false);
        if snapshot.is_empty() {
            // no data was collected for this period
            // TODO repeat previous frame min/max ?
            // TODO update some canary metric ?
        } else {
            for metric in snapshot {
                for score in metric.2 {
                    if let Some(ex) = (self.statistics)(metric.0, metric.1.as_ref(), score) {
                        let temp_metric =
                            self.target_chain.define_metric(ex.0, &ex.1.concat(), 1.0);
                        publish_scope_fn.write(&temp_metric, ex.2);
                    }
                }
            }
        }

        // TODO parameterize whether to keep ad-hoc metrics after publish
        // source.cleanup();
        publish_scope_fn.flush()
    }
}

/// A predefined export strategy reporting all aggregated stats for all metric types.
/// Resulting stats are named by appending a short suffix to each metric's name.
pub fn all_stats(kind: Kind, name: &str, score: ScoreType) -> Option<(Kind, Vec<&str>, Value)> {
    match score {
        Count(hit) => Some((Counter, vec![name, ".count"], hit)),
        Sum(sum) => Some((kind, vec![name, ".sum"], sum)),
        Mean(mean) => Some((kind, vec![name, ".mean"], mean.round() as Value)),
        Max(max) => Some((Gauge, vec![name, ".max"], max)),
        Min(min) => Some((Gauge, vec![name, ".min"], min)),
        Rate(rate) => Some((Gauge, vec![name, ".rate"], rate.round() as Value)),
    }
}

/// A predefined export strategy reporting the average value for every non-marker metric.
/// Marker metrics export their hit count instead.
/// Since there is only one stat per metric, there is no risk of collision
/// and so exported stats copy their metric's name.
pub fn average(kind: Kind, name: &str, score: ScoreType) -> Option<(Kind, Vec<&str>, Value)> {
    match kind {
        Marker => match score {
            Count(count) => Some((Counter, vec![name], count)),
            _ => None,
        },
        _ => match score {
            Mean(avg) => Some((Gauge, vec![name], avg.round() as Value)),
            _ => None,
        },
    }
}

/// A predefined single-stat-per-metric export strategy:
/// - Timers and Counters each export their sums
/// - Markers each export their hit count
/// - Gauges each export their average
/// Since there is only one stat per metric, there is no risk of collision
/// and so exported stats copy their metric's name.
pub fn summary(kind: Kind, name: &str, score: ScoreType) -> Option<(Kind, Vec<&str>, Value)> {
    match kind {
        Marker => match score {
            Count(count) => Some((Counter, vec![name], count)),
            _ => None,
        },
        Counter | Timer => match score {
            Sum(sum) => Some((kind, vec![name], sum)),
            _ => None,
        },
        Gauge => match score {
            Mean(mean) => Some((Gauge, vec![name], mean.round() as Value)),
            _ => None,
        },
    }
}