1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
#![doc = include_str!("../README.md")]

use std::fs::File;
use std::io::Write;
use std::sync::Mutex;

pub mod moments;
mod sensors;

pub use sensors::Counter;
pub use sensors::Gauge;
pub use sensors::Moments;

////////////////////////////////////////////// Sensor //////////////////////////////////////////////

/// [Sensor] is the core type of the system.  A sensor must be algebraic to be included in this
/// library.  An algebraic sensor allows one to take two readings, one on each side of a bucket,
/// and compute the bucket with a single subtraction.
pub trait Sensor {
    /// The type of a sensor reading.
    type Reading;

    /// Every sensor has a label.  This is a UTF-8 string.  It must be static because sensors are
    /// meant to be instantiated statically as well, and having the constraint here enforces that.
    fn label(&'static self) -> &'static str;
    /// Return a linearlizable view of the sensor.
    fn read(&'static self) -> Self::Reading;
}

////////////////////////////////////////// SensorRegistry //////////////////////////////////////////

/// [SensorRegistry] refers to a set of sensors of the same type.
struct SensorRegistry<S: Sensor + 'static> {
    sensors: Mutex<Vec<&'static S>>,
    register: &'static Counter,
    emit: &'static Counter,
    err: &'static Counter,
}

impl<S: Sensor + 'static> SensorRegistry<S> {
    /// Create a new [SensorRegistry] using the three counters for internal instrumentation.  We
    /// don't define these counters here, so that each registry can define its own counters and get
    /// ground truth about the registry.
    pub fn new(register: &'static Counter, emit: &'static Counter, err: &'static Counter) -> Self {
        Self {
            sensors: Mutex::new(Vec::new()),
            register,
            emit,
            err,
        }
    }

    /// Unconditionally register the sensor with the sensor library.
    pub fn register(&self, sensor: &'static S) {
        {
            let mut sensors = self.sensors.lock().unwrap();
            sensors.push(sensor);
        }
        self.register.click();
    }

    /// Emit readings all sensors through `emitter`+`emit`, recording each sensor reading as close
    /// to `now` as possible.
    fn emit<EM: Emitter<Error = ERR>, ERR>(
        &self,
        emitter: &mut EM,
        emit: &dyn Fn(&mut EM, &'static S, u64) -> Result<(), ERR>,
        now: u64,
    ) -> Result<(), ERR> {
        let num_sensors = { self.sensors.lock().unwrap().len() };
        let mut sensors: Vec<&'static S> = Vec::with_capacity(num_sensors);
        {
            let sensors_guard = self.sensors.lock().unwrap();
            for s in sensors_guard.iter() {
                sensors.push(*s);
            }
        }
        let mut result = Ok(());
        for sensor in sensors {
            match emit(emitter, sensor, now) {
                Ok(_) => self.emit.click(),
                Err(e) => {
                    if let Ok(()) = result {
                        result = Err(e);
                    }
                    self.err.click();
                }
            }
        }
        result
    }
}

///////////////////////////////////////////// Collector ////////////////////////////////////////////

/// Collect and register sensors of all types.  One registry per sensor type.
pub struct Collector {
    counters: SensorRegistry<Counter>,
    gauges: SensorRegistry<Gauge>,
    moments: SensorRegistry<Moments>,
}

static COLLECTOR_REGISTER_COUNTER: Counter = Counter::new("biometrics.collector.register.counter");
static COLLECTOR_REGISTER_GAUGE: Counter = Counter::new("biometrics.collector.register.gauge");
static COLLECTOR_REGISTER_MOMENTS: Counter = Counter::new("biometrics.collector.register.moments");
static COLLECTOR_EMIT_COUNTER: Counter = Counter::new("biometrics.collector.emit.counter");
static COLLECTOR_EMIT_GAUGE: Counter = Counter::new("biometrics.collector.emit.gauge");
static COLLECTOR_EMIT_MOMENTS: Counter = Counter::new("biometrics.collector.emit.moments");
static COLLECTOR_EMIT_FAILURE: Counter = Counter::new("biometrics.collector.emit.failure");
static COLLECTOR_TIME_FAILURE: Counter = Counter::new("biometrics.collector.time.failure");

impl Collector {
    /// Get a new [Collector].  The collector will use the global registries and emit to the
    /// COLLECTOR_* counters for easy monitoring.
    pub fn new() -> Self {
        let collector = Self {
            counters: SensorRegistry::new(
                &COLLECTOR_REGISTER_COUNTER,
                &COLLECTOR_EMIT_COUNTER,
                &COLLECTOR_EMIT_FAILURE,
            ),
            gauges: SensorRegistry::new(
                &COLLECTOR_REGISTER_GAUGE,
                &COLLECTOR_EMIT_GAUGE,
                &COLLECTOR_EMIT_FAILURE,
            ),
            moments: SensorRegistry::new(
                &COLLECTOR_REGISTER_MOMENTS,
                &COLLECTOR_EMIT_MOMENTS,
                &COLLECTOR_EMIT_FAILURE,
            ),
        };
        // Register counters with the collector.
        collector.register_counter(&COLLECTOR_REGISTER_COUNTER);
        collector.register_counter(&COLLECTOR_REGISTER_GAUGE);
        collector.register_counter(&COLLECTOR_REGISTER_MOMENTS);
        collector.register_counter(&COLLECTOR_EMIT_COUNTER);
        collector.register_counter(&COLLECTOR_EMIT_GAUGE);
        collector.register_counter(&COLLECTOR_EMIT_MOMENTS);
        collector.register_counter(&COLLECTOR_EMIT_FAILURE);
        collector.register_counter(&COLLECTOR_TIME_FAILURE);
        // Return the collector with counters initialized.
        collector
    }

    /// Register `counter` with the Collector.
    pub fn register_counter(&self, counter: &'static Counter) {
        self.counters.register(counter);
    }

    /// Register `gauge` with the Collector.
    pub fn register_gauge(&self, gauge: &'static Gauge) {
        self.gauges.register(gauge);
    }

    /// Register `moments` with the Collector.
    pub fn register_moments(&self, moments: &'static Moments) {
        self.moments.register(moments);
    }

    /// Output the sensors registered to this emitter.
    pub fn emit<EM: Emitter<Error = ERR>, ERR: std::fmt::Debug>(
        &self,
        emitter: &mut EM,
        now: u64,
    ) -> Result<(), ERR> {
        let result = Ok(());
        let result = result.and(self.counters.emit(emitter, &EM::emit_counter, now));
        let result = result.and(self.gauges.emit(emitter, &EM::emit_gauge, now));
        result.and(self.moments.emit(emitter, &EM::emit_moments, now))
    }
}

impl Default for Collector {
    fn default() -> Self {
        Self::new()
    }
}

////////////////////////////////////////////// Emitter /////////////////////////////////////////////

/// [Emitter] outputs the sensor state via I/O.
pub trait Emitter {
    /// The type of error this emitter returns.
    type Error;

    /// Read the provided [Counter].
    fn emit_counter(
        &mut self,
        counter: &'static Counter,
        now_millis: u64,
    ) -> Result<(), Self::Error>;
    /// Read the provided [Gauge].
    fn emit_gauge(&mut self, gauge: &'static Gauge, now_millis: u64) -> Result<(), Self::Error>;
    /// Read the provided [Moments].
    fn emit_moments(
        &mut self,
        moments: &'static Moments,
        now_millis: u64,
    ) -> Result<(), Self::Error>;
}

///////////////////////////////////////// PlainTextEmitter /////////////////////////////////////////

/// An emitter that puts readings one-per-line.
pub struct PlainTextEmitter {
    output: File,
}

impl PlainTextEmitter {
    /// Create a new plain-text emitter.
    pub fn new(output: File) -> Self {
        Self { output }
    }
}

impl Emitter for PlainTextEmitter {
    type Error = std::io::Error;

    fn emit_counter(&mut self, counter: &'static Counter, now: u64) -> Result<(), std::io::Error> {
        self.output.write_fmt(format_args!(
            "{} {} {}\n",
            counter.label(),
            now,
            counter.read()
        ))
    }

    fn emit_gauge(&mut self, gauge: &'static Gauge, now: u64) -> Result<(), std::io::Error> {
        self.output
            .write_fmt(format_args!("{} {} {}\n", gauge.label(), now, gauge.read()))
    }

    fn emit_moments(&mut self, moments: &'static Moments, now: u64) -> Result<(), std::io::Error> {
        let label = moments.label();
        let moments = moments.read();
        self.output.write_fmt(format_args!(
            "{} {} {} {} {} {} {}\n",
            label, now, moments.n, moments.m1, moments.m2, moments.m3, moments.m4,
        ))
    }
}

/////////////////////////////////////////////// tests //////////////////////////////////////////////

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn collector_new() {
        let _: Collector = Collector::new();
    }
}