tic/
receiver.rs

1#![allow(deprecated)]
2
3use clocksource::Clocksource;
4use common::{self, ControlMessage, Interest, Percentile};
5use config::Config;
6use controller::Controller;
7use data::{Allans, Counters, Gauges, Heatmaps, Histograms, Meters, Sample};
8use mio::{self, Events, Poll, PollOpt, Ready};
9use mio_extras::channel;
10use mpmc::Queue;
11use sender::Sender;
12use std::collections::HashSet;
13use std::fmt::Display;
14use std::hash::Hash;
15use std::sync::Arc;
16
17// define token numbers for data and control queues
18#[derive(Clone, Copy)]
19enum Token {
20    Control = 0,
21    Data = 1,
22}
23
24/// a `Receiver` processes incoming `Sample`s and generates stats
25pub struct Receiver<T> {
26    window_time: u64,
27    window_duration: u64,
28    end_time: u64,
29    run_duration: u64,
30    config: Config<T>,
31    empty_queue: Arc<Queue<Vec<Sample<T>>>>,
32    data_rx: channel::Receiver<Vec<Sample<T>>>,
33    data_tx: channel::SyncSender<Vec<Sample<T>>>,
34    control_rx: channel::Receiver<ControlMessage<T>>,
35    control_tx: channel::SyncSender<ControlMessage<T>>,
36    allans: Allans<T>,
37    counters: Counters<T>,
38    gauges: Gauges<T>,
39    latency_histograms: Histograms<T>,
40    value_histograms: Histograms<T>,
41    meters: Meters<T>,
42    interests: HashSet<Interest<T>>,
43    taus: Vec<usize>,
44    percentiles: Vec<Percentile>,
45    latency_heatmaps: Heatmaps<T>,
46    value_heatmaps: Heatmaps<T>,
47    clocksource: Clocksource,
48    poll: Poll,
49}
50
51impl<T: Hash + Eq + Send + Clone + Display> Default for Receiver<T> {
52    /// create a default `Receiver`
53    fn default() -> Self {
54        Config::new().build()
55    }
56}
57
58impl<T: Hash + Eq + Send + Display + Clone> Receiver<T> {
59    /// create a new `Receiver` using the defaults
60    pub fn new() -> Receiver<T> {
61        Default::default()
62    }
63
64    /// create a `Receiver` from a tic::Config
65    pub fn configured(config: Config<T>) -> Receiver<T> {
66        let (data_tx, data_rx) = channel::sync_channel::<Vec<Sample<T>>>(config.capacity);
67        let (control_tx, control_rx) = channel::sync_channel::<ControlMessage<T>>(config.capacity);
68        let empty_queue = Arc::new(Queue::with_capacity(config.capacity));
69        for _ in 0..config.capacity {
70            let _ = empty_queue.push(Vec::with_capacity(config.batch_size));
71        }
72
73        let clocksource = Clocksource::default();
74        let slices = config.duration * config.windows;
75
76        // calculate counter values for start, window, and end times
77        let start_time = clocksource.counter();
78        let window_duration = (config.duration as f64 * clocksource.frequency()) as u64;
79        let window_time = start_time + window_duration;
80        let run_duration = config.windows as u64 * window_duration;
81        let end_time = start_time + run_duration;
82
83        let max_tau = config.max_tau;
84
85        let poll = Poll::new().unwrap();
86        poll.register(
87            &data_rx,
88            mio::Token(Token::Data as usize),
89            Ready::readable(),
90            PollOpt::level(),
91        ).unwrap();
92        poll.register(
93            &control_rx,
94            mio::Token(Token::Control as usize),
95            Ready::readable(),
96            PollOpt::level(),
97        ).unwrap();
98
99        Receiver {
100            window_duration: window_duration,
101            window_time: window_time,
102            run_duration: run_duration,
103            end_time: end_time,
104            config: config,
105            empty_queue: empty_queue,
106            data_tx: data_tx,
107            data_rx: data_rx,
108            control_tx: control_tx,
109            control_rx: control_rx,
110            allans: Allans::new(max_tau),
111            counters: Counters::new(),
112            gauges: Gauges::new(),
113            latency_histograms: Histograms::new(),
114            value_histograms: Histograms::new(),
115            meters: Meters::new(),
116            interests: HashSet::new(),
117            taus: common::default_taus(),
118            percentiles: common::default_percentiles(),
119            latency_heatmaps: Heatmaps::new(slices, start_time),
120            value_heatmaps: Heatmaps::new(slices, start_time),
121            clocksource: clocksource,
122            poll: poll,
123        }
124    }
125
126    /// Create a new Config which can be used to build() a Receiver
127    pub fn configure() -> Config<T> {
128        Config::default()
129    }
130
131    /// returns a clone of the `Sender`
132    pub fn get_sender(&self) -> Sender<T> {
133        Sender::new(
134            Arc::clone(&self.empty_queue),
135            self.data_tx.clone(),
136            self.control_tx.clone(),
137            self.config.batch_size,
138        )
139    }
140
141    /// returns a clone of the `Clocksource`
142    pub fn get_clocksource(&self) -> Clocksource {
143        self.clocksource.clone()
144    }
145
146    /// creates a `Controller` attached to this `Receiver`
147    pub fn get_controller(&self) -> Controller<T> {
148        Controller::new(self.control_tx.clone())
149    }
150
151    /// register a stat for export
152    pub fn add_interest(&mut self, interest: Interest<T>) {
153        match interest.clone() {
154            Interest::AllanDeviation(key) => {
155                self.allans.init(key);
156            }
157            Interest::Count(key) => {
158                self.counters.init(key);
159            }
160            Interest::Gauge(key) => {
161                self.gauges.init(key);
162            }
163            Interest::LatencyPercentile(key) => {
164                self.latency_histograms.init(key);
165            }
166            Interest::ValuePercentile(key) => {
167                self.value_histograms.init(key);
168            }
169            Interest::LatencyTrace(key, _) |
170            Interest::LatencyWaterfall(key, _) => {
171                self.latency_heatmaps.init(key);
172            }
173            Interest::ValueTrace(key, _) |
174            Interest::ValueWaterfall(key, _) => {
175                self.value_heatmaps.init(key);
176            }
177        }
178        self.interests.insert(interest);
179    }
180
181    /// de-register a stat for export
182    pub fn remove_interest(&mut self, interest: &Interest<T>) {
183        match interest.clone() {
184            Interest::AllanDeviation(key) => {
185                self.allans.remove(key);
186            }
187            Interest::Count(key) => {
188                self.counters.remove(key);
189            }
190            Interest::Gauge(key) => {
191                self.gauges.remove(key);
192            }
193            Interest::LatencyPercentile(key) => {
194                self.latency_histograms.remove(key);
195            }
196            Interest::ValuePercentile(key) => {
197                self.value_histograms.remove(key);
198            }
199            Interest::LatencyTrace(key, _) |
200            Interest::LatencyWaterfall(key, _) => {
201                self.latency_heatmaps.remove(key);
202            }
203            Interest::ValueTrace(key, _) |
204            Interest::ValueWaterfall(key, _) => {
205                self.value_heatmaps.remove(key);
206            }
207        }
208        self.interests.remove(interest);
209    }
210
211    /// clear the heatmaps
212    pub fn clear_heatmaps(&mut self) {
213        self.latency_heatmaps.clear();
214        self.value_heatmaps.clear();
215    }
216
217    /// run the receive loop for one window
218    pub fn run_once(&mut self) {
219        trace!("run once");
220
221        let window_time = self.window_time;
222
223        loop {
224            if self.check_elapsed(window_time) {
225                return;
226            }
227
228            let mut events = Events::with_capacity(1024);
229            self.poll.poll(&mut events, self.config.poll_delay).unwrap();
230            for event in events.iter() {
231                trace!("got: {} events", events.len());
232                let token = event.token().0;
233                if token == Token::Data as usize {
234                    if let Ok(mut results) = self.data_rx.try_recv() {
235                        for result in &results {
236                            let t0 = self.clocksource.convert(result.start());
237                            let t1 = self.clocksource.convert(result.stop());
238                            let dt = t1 - t0;
239                            self.allans.record(result.metric(), dt);
240                            self.gauges.set(result.metric(), result.value());
241                            self.counters.increment_by(result.metric(), result.count());
242                            self.latency_histograms.increment(
243                                result.metric(),
244                                dt as u64,
245                            );
246                            self.value_histograms.increment(
247                                result.metric(),
248                                result.count(),
249                            );
250                            self.latency_heatmaps.increment(
251                                result.metric(),
252                                t0 as u64,
253                                dt as u64,
254                            );
255                            self.value_heatmaps.increment(
256                                result.metric(),
257                                t0 as u64,
258                                result.count(),
259                            );
260                        }
261                        results.clear();
262                        let _ = self.empty_queue.push(results);
263                        trace!("finished processing");
264                    }
265                } else if token == Token::Control as usize {
266                    if let Ok(msg) = self.control_rx.try_recv() {
267                        match msg {
268                            ControlMessage::AddInterest(interest) => {
269                                self.add_interest(interest);
270                            }
271                            ControlMessage::RemoveInterest(interest) => {
272                                self.remove_interest(&interest);
273                            }
274                            ControlMessage::SnapshotMeters(tx) => {
275                                let meters = self.clone_meters();
276                                tx.send(meters).unwrap();
277                            }
278                        }
279                    }
280                }
281            }
282            trace!("run complete");
283        }
284    }
285
286    // this function will check if the window is passed
287    // if it has, it will refresh the `Meters`
288    fn check_elapsed(&mut self, t1: u64) -> bool {
289        let tsc = self.clocksource.counter();
290        if tsc >= t1 {
291            self.meters.clear();
292            for interest in &self.interests {
293                match *interest {
294                    Interest::Count(ref key) => {
295                        self.meters.set_count(
296                            key.clone(),
297                            self.counters.count(key.clone()),
298                        );
299                    }
300                    Interest::Gauge(ref key) => {
301                        self.meters.set_value(
302                            key.clone(),
303                            self.gauges.value(key.clone()),
304                        );
305                    }
306                    Interest::LatencyPercentile(ref key) => {
307                        for percentile in self.percentiles.clone() {
308                            self.meters.set_latency_percentile(
309                                key.clone(),
310                                percentile.clone(),
311                                self.latency_histograms
312                                    .percentile(key.clone(), percentile.1)
313                                    .unwrap_or(0),
314                            );
315                        }
316                    }
317                    Interest::ValuePercentile(ref key) => {
318                        for percentile in self.percentiles.clone() {
319                            self.meters.set_value_percentile(
320                                key.clone(),
321                                percentile.clone(),
322                                (self.value_histograms
323                                     .percentile(key.clone(), percentile.1)
324                                     .unwrap_or(0) as f64 *
325                                     self.config.sample_rate) as
326                                    u64,
327                            );
328                        }
329                    }
330                    Interest::AllanDeviation(ref key) => {
331                        for tau in self.taus.clone() {
332                            if let Ok(adev) = self.allans.adev(key, tau) {
333                                self.meters.set_adev(key.clone(), tau, adev);
334                            }
335                        }
336                    }
337                    _ => {}
338                }
339            }
340
341            self.latency_histograms.clear();
342            self.value_histograms.clear();
343            self.window_time += self.window_duration;
344            return true;
345        }
346        false
347    }
348
349    /// run the receive loop for all windows, output waterfall and traces as requested
350    pub fn run(&mut self) {
351        let mut window = 0;
352        debug!("collection ready");
353        'outer: loop {
354            'inner: loop {
355                self.run_once();
356                window += 1;
357                if window >= self.config.windows {
358                    break 'inner;
359                }
360            }
361
362            self.save_files();
363
364            if !self.config.service_mode {
365                break 'outer;
366            } else {
367                self.clear_heatmaps();
368                self.end_time += self.run_duration;
369            }
370        }
371    }
372
373    /// save all artifacts
374    pub fn save_files(&mut self) {
375        for interest in self.interests.clone() {
376            match interest {
377                Interest::LatencyTrace(l, f) => {
378                    self.latency_heatmaps.trace(l, f);
379                }
380                Interest::ValueTrace(l, f) => {
381                    self.value_heatmaps.trace(l, f);
382                }
383                Interest::LatencyWaterfall(l, f) => {
384                    self.latency_heatmaps.waterfall(l, f);
385                }
386                Interest::ValueWaterfall(l, f) => {
387                    self.value_heatmaps.waterfall(l, f);
388                }
389                _ => {}
390            }
391        }
392    }
393
394    /// return a clone of the raw `Meters`
395    pub fn clone_meters(&self) -> Meters<T> {
396        self.meters.clone()
397    }
398}
399
400#[cfg(feature = "benchmark")]
401#[cfg(test)]
402mod benchmark {
403    extern crate test;
404    use super::*;
405
406    #[bench]
407    fn heavy_cycle(b: &mut test::Bencher) {
408        let mut receiver = Receiver::<String>::new();
409        receiver.add_interest(Interest::Count("test".to_owned()));
410        receiver.add_interest(Interest::LatencyPercentile("test".to_owned()));
411        receiver.add_interest(Interest::AllanDeviation("test".to_owned()));
412        b.iter(|| {
413            // full stats evaluation
414            receiver.check_elapsed(0);
415        });
416    }
417
418    #[bench]
419    fn cheap_cycle(b: &mut test::Bencher) {
420        let mut receiver = Receiver::<String>::new();
421        receiver.add_interest(Interest::Count("test".to_owned()));
422        receiver.add_interest(Interest::LatencyPercentile("test".to_owned()));
423        receiver.add_interest(Interest::AllanDeviation("test".to_owned()));
424        b.iter(|| {
425            // no stats evaluation just get clock and compare
426            receiver.check_elapsed(u64::max_value());
427        });
428    }
429}