1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
use core::*;
use pcg32;
use std::sync::Arc;
pub fn sample<M, S>(sampling_rate: Rate, sink: S) -> SamplingSink<S>
where S: Sink<M>, M: Send + Sync
{
SamplingSink { next_sink: sink, sampling_rate}
}
#[derive(Debug)]
pub struct Sampled<M> {
target: M,
int_sampling_rate: u32,
}
pub struct SamplingSink<S> {
next_sink: S,
sampling_rate: Rate,
}
impl<M, S> Sink<Sampled<M>> for SamplingSink<S>
where S: Sink<M>, M: 'static + Send + Sync
{
#[allow(unused_variables)]
fn new_metric(&self, kind: Kind, name: &str, sampling: Rate) -> Sampled<M> {
assert_eq!(sampling, FULL_SAMPLING_RATE, "Overriding previously set sampling rate");
let pm = self.next_sink.new_metric(kind, name, self.sampling_rate);
Sampled {
target: pm,
int_sampling_rate: pcg32::to_int_rate(self.sampling_rate),
}
}
fn new_scope(&self) -> ScopeFn<Sampled<M>> {
let next_scope = self.next_sink.new_scope();
Arc::new(move |cmd| {
if let Scope::Write(metric, value) = cmd {
if pcg32::accept_sample(metric.int_sampling_rate) {
next_scope(Scope::Write(&metric.target, value))
}
}
next_scope(Scope::Flush)
})
}
}