1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
//! Dipstick metrics core types and traits.
//! This is mostly centered around the backend.
//! Application-facing types are in the `app` module.

use self::Kind::*;
use self::ScopeCmd::*;

use std::sync::Arc;
use std::time::Instant;

// TODO define an 'AsValue' trait + impl for supported number types, then drop 'num' crate
pub use num::ToPrimitive;

/// Base type for recorded metric values.
// TODO should this be f64? f32?
pub type Value = u64;

#[derive(Debug, Copy, Clone)]
/// A handle to the start time of a counter.
/// Wrapped so it may be changed safely later.
pub struct TimeHandle(Instant);

impl TimeHandle {

    /// Get a handle on current time.
    /// Used by the TimerMetric start_time() method.
    pub fn now() -> TimeHandle {
        TimeHandle(Instant::now())
    }

    /// Get the elapsed time in microseconds since TimeHandle was obtained.
    pub fn elapsed_us(self) -> Value {
        let duration = Instant::now() - self.0;
        duration.as_secs() * 1000000 + (duration.subsec_nanos() / 1000) as Value
    }

    /// Get the elapsed time in microseconds since TimeHandle was obtained.
    pub fn elapsed_ms(self) -> Value {
        let duration = Instant::now() - self.0;
        duration.as_secs() * 1000 + (duration.subsec_nanos() / 1000000) as Value
    }

}

/// Base type for sampling rate.
/// - 1.0 records everything
/// - 0.5 records one of two values
/// - 0.0 records nothing
/// The actual distribution (random, fixed-cycled, etc) depends on selected sampling method.
pub type Rate = f64;

/// Do not sample, use all data.
pub const FULL_SAMPLING_RATE: Rate = 1.0;

/// Used to differentiate between metric kinds in the backend.
#[derive(Debug, Copy, Clone)]
pub enum Kind {
    /// Handling one item at a time.
    Marker,
    /// Handling quantities or multiples.
    Counter,
    /// Reporting instant measurement of a resource at a point in time.
    Gauge,
    /// Measuring a time interval, internal to the app or provided by an external source.
    Timer,
}

/// Dynamic metric definition function.
/// Metrics can be defined from any thread, concurrently (Fn is Sync).
/// The resulting metrics themselves can be also be safely shared across threads (<M> is Send + Sync).
/// Concurrent usage of a metric is done using threaded scopes.
/// Shared concurrent scopes may be provided by some backends (aggregate).
pub type DefineMetricFn<M> = Arc<Fn(Kind, &str, Rate) -> M + Send + Sync>;

/// A function trait that opens a new metric capture scope.
pub type OpenScopeFn<M> = Arc<Fn(bool) -> ControlScopeFn<M> + Send + Sync>;

/// Returns a callback function to send commands to the metric scope.
/// Writes can be performed by passing Some((&Metric, Value))
/// Flushes can be performed by passing None
/// Used to write values to the scope or flush the scope buffer (if applicable).
/// Simple applications may use only one scope.
/// Complex applications may define a new scope fo each operation or request.
/// Scopes can be moved acrossed threads (Send) but are not required to be thread-safe (Sync).
/// Some implementations _may_ be 'Sync', otherwise queue()ing or threadlocal() can be used.
#[derive(Clone)]
pub struct ControlScopeFn<M> {
    flush_on_drop: bool,
    scope_fn: Arc<Fn(ScopeCmd<M>)>,
}

unsafe impl<M> Sync for ControlScopeFn<M> {}
unsafe impl<M> Send for ControlScopeFn<M> {}

/// An method dispatching command enum to manipulate metric scopes.
/// Replaces a potential `Writer` trait that would have methods `write` and `flush`.
/// Using a command pattern allows buffering, async queuing and inline definition of writers.
pub enum ScopeCmd<'a, M: 'a> {
    /// Write the value for the metric.
    /// Takes a reference to minimize overhead in single-threaded scenarios.
    Write(&'a M, Value),

    /// Flush the scope buffer, if applicable.
    Flush,
}

impl<M> ControlScopeFn<M> {
    /// Create a new metric scope based on the provided scope function.
    ///
    /// ```rust
    /// use dipstick::ControlScopeFn;
    /// let ref mut scope: ControlScopeFn<String> = ControlScopeFn::new(|_cmd| { /* match cmd {} */  });
    /// ```
    ///
    pub fn new<F>(scope_fn: F) -> Self
        where F: Fn(ScopeCmd<M>) + 'static
    {
        ControlScopeFn {
            flush_on_drop: true,
            scope_fn: Arc::new(scope_fn)
        }
    }

    /// Write a value to this scope.
    ///
    /// ```rust
    /// let ref mut scope = dipstick::to_log().open_scope(false);
    /// scope.write(&"counter".to_string(), 6);
    /// ```
    ///
    #[inline]
    pub fn write(&self, metric: &M, value: Value) {
        (self.scope_fn)(Write(metric, value))
    }

    /// Flush this scope, if buffered.
    ///
    /// ```rust
    /// let ref mut scope = dipstick::to_log().open_scope(true);
    /// scope.flush();
    /// ```
    ///
    #[inline]
    pub fn flush(&self) {
        (self.scope_fn)(Flush)
    }

    /// If scope is buffered, controls whether to flush the scope one last time when it is dropped.
    /// The default is true.
    ///
    /// ```rust
    /// let ref mut scope = dipstick::to_log().open_scope(true).flush_on_drop(false);
    /// ```
    ///
    pub fn flush_on_drop(mut self, enable: bool) -> Self {
        self.flush_on_drop = enable;
        self
    }
}

/// A pair of functions composing a twin "chain of command".
/// This is the building block for the metrics backend.
#[derive(Derivative, Clone)]
#[derivative(Debug)]
pub struct Chain<M> {
    #[derivative(Debug = "ignore")] define_metric_fn: DefineMetricFn<M>,

    #[derivative(Debug = "ignore")] scope_metric_fn: OpenScopeFn<M>,
}

impl<M> Chain<M> {
    /// Define a new metric.
    #[allow(unused_variables)]
    pub fn define_metric(&self, kind: Kind, name: &str, sampling: Rate) -> M {
        (self.define_metric_fn)(kind, name, sampling)
    }

    /// Open a new metric scope.
    /// Scope metrics allow an application to emit per-operation statistics,
    /// For example, producing a per-request performance log.
    ///
    /// Although the scope metrics can be predefined like in ['AppMetrics'], the application needs to
    /// create a scope that will be passed back when reporting scoped metric values.
    ///
    /// ```rust
    /// use dipstick::*;
    /// let scope_metrics = to_log();
    /// let request_counter = scope_metrics.counter("scope_counter");
    /// {
    ///     let ref mut request_scope = scope_metrics.open_scope(true);
    ///     request_counter.count(request_scope, 42);
    /// }
    /// ```
    ///
    pub fn open_scope(&self, buffered: bool) -> ControlScopeFn<M> {
        (self.scope_metric_fn)(buffered)
    }

    /// Open a buffered scope.
    #[inline]
    pub fn buffered_scope(&self) -> ControlScopeFn<M> {
        self.open_scope(true)
    }

    /// Open an unbuffered scope.
    #[inline]
    pub fn unbuffered_scope(&self) -> ControlScopeFn<M> {
        self.open_scope(false)
    }
}

impl<M: Send + Sync + Clone + 'static> Chain<M> {
    /// Create a new metric chain with the provided metric definition and scope creation functions.
    pub fn new<MF, WF>(make_metric: MF, make_scope: WF) -> Self
        where
            MF: Fn(Kind, &str, Rate) -> M + Send + Sync + 'static,
            WF: Fn(bool) -> ControlScopeFn<M> + Send + Sync + 'static,
    {
        Chain {
            // capture the provided closures in Arc to provide cheap clones
            define_metric_fn: Arc::new(make_metric),
            scope_metric_fn: Arc::new(make_scope),
        }
    }

    /// Get an event counter of the provided name.
    pub fn marker<AS: AsRef<str>>(&self, name: AS) -> ScopeMarker<M> {
        let metric = self.define_metric(Marker, name.as_ref(), 1.0);
        ScopeMarker { metric }
    }

    /// Get a counter of the provided name.
    pub fn counter<AS: AsRef<str>>(&self, name: AS) -> ScopeCounter<M> {
        let metric = self.define_metric(Counter, name.as_ref(), 1.0);
        ScopeCounter { metric }
    }

    /// Get a timer of the provided name.
    pub fn timer<AS: AsRef<str>>(&self, name: AS) -> ScopeTimer<M> {
        let metric = self.define_metric(Timer, name.as_ref(), 1.0);
        ScopeTimer { metric }
    }

    /// Get a gauge of the provided name.
    pub fn gauge<AS: AsRef<str>>(&self, name: AS) -> ScopeGauge<M> {
        let metric = self.define_metric(Gauge, name.as_ref(), 1.0);
        ScopeGauge { metric }
    }

    /// Intercept metric definition without changing the metric type.
    pub fn mod_metric<MF>(&self, mod_fn: MF) -> Chain<M>
        where
            MF: Fn(DefineMetricFn<M>) -> DefineMetricFn<M>,
    {
        Chain {
            define_metric_fn: mod_fn(self.define_metric_fn.clone()),
            scope_metric_fn: self.scope_metric_fn.clone(),
        }
    }

    /// Intercept both metric definition and scope creation, possibly changing the metric type.
    pub fn mod_both<MF, N>(&self, mod_fn: MF) -> Chain<N>
        where
            MF: Fn(DefineMetricFn<M>, OpenScopeFn<M>) -> (DefineMetricFn<N>, OpenScopeFn<N>),
            N: Clone + Send + Sync,
    {
        let (metric_fn, scope_fn) =
            mod_fn(self.define_metric_fn.clone(), self.scope_metric_fn.clone());
        Chain {
            define_metric_fn: metric_fn,
            scope_metric_fn: scope_fn,
        }
    }

    /// Intercept scope creation.
    pub fn mod_scope<MF>(&self, mod_fn: MF) -> Self
        where
            MF: Fn(OpenScopeFn<M>) -> OpenScopeFn<M>,
    {
        Chain {
            define_metric_fn: self.define_metric_fn.clone(),
            scope_metric_fn: mod_fn(self.scope_metric_fn.clone()),
        }
    }
}

/// A monotonic counter metric.
/// Since value is only ever increased by one, no value parameter is provided,
/// preventing programming errors.
#[derive(Derivative)]
#[derivative(Debug)]
pub struct ScopeMarker<M> {
    metric: M,
}

impl<M> ScopeMarker<M> {
    /// Record a single event occurence.
    #[inline]
    pub fn mark(&self, scope: &mut ControlScopeFn<M>) {
        scope.write(&self.metric, 1);
    }
}

/// A counter that sends values to the metrics backend
#[derive(Derivative)]
#[derivative(Debug)]
pub struct ScopeCounter<M> {
    metric: M,
}

impl<M> ScopeCounter<M> {
    /// Record a value count.
    #[inline]
    pub fn count<V>(&self, scope: &mut ControlScopeFn<M>, count: V)
        where
            V: ToPrimitive,
    {
        scope.write(&self.metric, count.to_u64().unwrap());
    }
}

/// A gauge that sends values to the metrics backend
#[derive(Derivative)]
#[derivative(Debug)]
pub struct ScopeGauge<M> {
    metric: M,
}

impl<M: Clone> ScopeGauge<M> {
    /// Record a value point for this gauge.
    #[inline]
    pub fn value<V>(&self, scope: &mut ControlScopeFn<M>, value: V)
        where
            V: ToPrimitive,
    {
        scope.write(&self.metric, value.to_u64().unwrap());
    }
}

/// A timer that sends values to the metrics backend
/// Timers can record time intervals in multiple ways :
/// - with the time! macro which wraps an expression or block with start() and stop() calls.
/// - with the time(Fn) method which wraps a closure with start() and stop() calls.
/// - with start() and stop() methods wrapping around the operation to time
/// - with the interval_us() method, providing an externally determined microsecond interval
#[derive(Derivative)]
#[derivative(Debug)]
pub struct ScopeTimer<M> {
    metric: M,
}

impl<M: Clone> ScopeTimer<M> {
    /// Record a microsecond interval for this timer
    /// Can be used in place of start()/stop() if an external time interval source is used
    #[inline]
    pub fn interval_us<V>(&self, scope: &mut ControlScopeFn<M>, interval_us: V) -> V
        where
            V: ToPrimitive,
    {
        scope.write(&self.metric, interval_us.to_u64().unwrap());
        interval_us
    }

    /// Obtain a opaque handle to the current time.
    /// The handle is passed back to the stop() method to record a time interval.
    /// This is actually a convenience method to the TimeHandle::now()
    /// Beware, handles obtained here are not bound to this specific timer instance
    /// _for now_ but might be in the future for safety.
    /// If you require safe multi-timer handles, get them through TimeType::now()
    #[inline]
    pub fn start(&self) -> TimeHandle {
        TimeHandle::now()
    }

    /// Record the time elapsed since the start_time handle was obtained.
    /// This call can be performed multiple times using the same handle,
    /// reporting distinct time intervals each time.
    /// Returns the microsecond interval value that was recorded.
    #[inline]
    pub fn stop(&self, scope: &mut ControlScopeFn<M>, start_time: TimeHandle) -> u64 {
        let elapsed_us = start_time.elapsed_us();
        self.interval_us(scope, elapsed_us)
    }

    /// Record the time taken to execute the provided closure
    #[inline]
    pub fn time<F, R>(&self, scope: &mut ControlScopeFn<M>, operations: F) -> R
        where
            F: FnOnce() -> R,
    {
        let start_time = self.start();
        let value: R = operations();
        self.stop(scope, start_time);
        value
    }
}


#[cfg(feature = "bench")]
mod bench {

    use super::*;
    use test;

    #[bench]
    fn get_instant(b: &mut test::Bencher) {
        b.iter(|| test::black_box(TimeHandle::now()));
    }

}