1#![allow(deprecated)]
2
3use clocksource::Clocksource;
4use common::{self, ControlMessage, Interest, Percentile};
5use config::Config;
6use controller::Controller;
7use data::{Allans, Counters, Gauges, Heatmaps, Histograms, Meters, Sample};
8use mio::{self, Events, Poll, PollOpt, Ready};
9use mio_extras::channel;
10use mpmc::Queue;
11use sender::Sender;
12use std::collections::HashSet;
13use std::fmt::Display;
14use std::hash::Hash;
15use std::sync::Arc;
16
17#[derive(Clone, Copy)]
19enum Token {
20 Control = 0,
21 Data = 1,
22}
23
24pub struct Receiver<T> {
26 window_time: u64,
27 window_duration: u64,
28 end_time: u64,
29 run_duration: u64,
30 config: Config<T>,
31 empty_queue: Arc<Queue<Vec<Sample<T>>>>,
32 data_rx: channel::Receiver<Vec<Sample<T>>>,
33 data_tx: channel::SyncSender<Vec<Sample<T>>>,
34 control_rx: channel::Receiver<ControlMessage<T>>,
35 control_tx: channel::SyncSender<ControlMessage<T>>,
36 allans: Allans<T>,
37 counters: Counters<T>,
38 gauges: Gauges<T>,
39 latency_histograms: Histograms<T>,
40 value_histograms: Histograms<T>,
41 meters: Meters<T>,
42 interests: HashSet<Interest<T>>,
43 taus: Vec<usize>,
44 percentiles: Vec<Percentile>,
45 latency_heatmaps: Heatmaps<T>,
46 value_heatmaps: Heatmaps<T>,
47 clocksource: Clocksource,
48 poll: Poll,
49}
50
51impl<T: Hash + Eq + Send + Clone + Display> Default for Receiver<T> {
52 fn default() -> Self {
54 Config::new().build()
55 }
56}
57
58impl<T: Hash + Eq + Send + Display + Clone> Receiver<T> {
59 pub fn new() -> Receiver<T> {
61 Default::default()
62 }
63
64 pub fn configured(config: Config<T>) -> Receiver<T> {
66 let (data_tx, data_rx) = channel::sync_channel::<Vec<Sample<T>>>(config.capacity);
67 let (control_tx, control_rx) = channel::sync_channel::<ControlMessage<T>>(config.capacity);
68 let empty_queue = Arc::new(Queue::with_capacity(config.capacity));
69 for _ in 0..config.capacity {
70 let _ = empty_queue.push(Vec::with_capacity(config.batch_size));
71 }
72
73 let clocksource = Clocksource::default();
74 let slices = config.duration * config.windows;
75
76 let start_time = clocksource.counter();
78 let window_duration = (config.duration as f64 * clocksource.frequency()) as u64;
79 let window_time = start_time + window_duration;
80 let run_duration = config.windows as u64 * window_duration;
81 let end_time = start_time + run_duration;
82
83 let max_tau = config.max_tau;
84
85 let poll = Poll::new().unwrap();
86 poll.register(
87 &data_rx,
88 mio::Token(Token::Data as usize),
89 Ready::readable(),
90 PollOpt::level(),
91 ).unwrap();
92 poll.register(
93 &control_rx,
94 mio::Token(Token::Control as usize),
95 Ready::readable(),
96 PollOpt::level(),
97 ).unwrap();
98
99 Receiver {
100 window_duration: window_duration,
101 window_time: window_time,
102 run_duration: run_duration,
103 end_time: end_time,
104 config: config,
105 empty_queue: empty_queue,
106 data_tx: data_tx,
107 data_rx: data_rx,
108 control_tx: control_tx,
109 control_rx: control_rx,
110 allans: Allans::new(max_tau),
111 counters: Counters::new(),
112 gauges: Gauges::new(),
113 latency_histograms: Histograms::new(),
114 value_histograms: Histograms::new(),
115 meters: Meters::new(),
116 interests: HashSet::new(),
117 taus: common::default_taus(),
118 percentiles: common::default_percentiles(),
119 latency_heatmaps: Heatmaps::new(slices, start_time),
120 value_heatmaps: Heatmaps::new(slices, start_time),
121 clocksource: clocksource,
122 poll: poll,
123 }
124 }
125
126 pub fn configure() -> Config<T> {
128 Config::default()
129 }
130
131 pub fn get_sender(&self) -> Sender<T> {
133 Sender::new(
134 Arc::clone(&self.empty_queue),
135 self.data_tx.clone(),
136 self.control_tx.clone(),
137 self.config.batch_size,
138 )
139 }
140
141 pub fn get_clocksource(&self) -> Clocksource {
143 self.clocksource.clone()
144 }
145
146 pub fn get_controller(&self) -> Controller<T> {
148 Controller::new(self.control_tx.clone())
149 }
150
151 pub fn add_interest(&mut self, interest: Interest<T>) {
153 match interest.clone() {
154 Interest::AllanDeviation(key) => {
155 self.allans.init(key);
156 }
157 Interest::Count(key) => {
158 self.counters.init(key);
159 }
160 Interest::Gauge(key) => {
161 self.gauges.init(key);
162 }
163 Interest::LatencyPercentile(key) => {
164 self.latency_histograms.init(key);
165 }
166 Interest::ValuePercentile(key) => {
167 self.value_histograms.init(key);
168 }
169 Interest::LatencyTrace(key, _) |
170 Interest::LatencyWaterfall(key, _) => {
171 self.latency_heatmaps.init(key);
172 }
173 Interest::ValueTrace(key, _) |
174 Interest::ValueWaterfall(key, _) => {
175 self.value_heatmaps.init(key);
176 }
177 }
178 self.interests.insert(interest);
179 }
180
181 pub fn remove_interest(&mut self, interest: &Interest<T>) {
183 match interest.clone() {
184 Interest::AllanDeviation(key) => {
185 self.allans.remove(key);
186 }
187 Interest::Count(key) => {
188 self.counters.remove(key);
189 }
190 Interest::Gauge(key) => {
191 self.gauges.remove(key);
192 }
193 Interest::LatencyPercentile(key) => {
194 self.latency_histograms.remove(key);
195 }
196 Interest::ValuePercentile(key) => {
197 self.value_histograms.remove(key);
198 }
199 Interest::LatencyTrace(key, _) |
200 Interest::LatencyWaterfall(key, _) => {
201 self.latency_heatmaps.remove(key);
202 }
203 Interest::ValueTrace(key, _) |
204 Interest::ValueWaterfall(key, _) => {
205 self.value_heatmaps.remove(key);
206 }
207 }
208 self.interests.remove(interest);
209 }
210
211 pub fn clear_heatmaps(&mut self) {
213 self.latency_heatmaps.clear();
214 self.value_heatmaps.clear();
215 }
216
217 pub fn run_once(&mut self) {
219 trace!("run once");
220
221 let window_time = self.window_time;
222
223 loop {
224 if self.check_elapsed(window_time) {
225 return;
226 }
227
228 let mut events = Events::with_capacity(1024);
229 self.poll.poll(&mut events, self.config.poll_delay).unwrap();
230 for event in events.iter() {
231 trace!("got: {} events", events.len());
232 let token = event.token().0;
233 if token == Token::Data as usize {
234 if let Ok(mut results) = self.data_rx.try_recv() {
235 for result in &results {
236 let t0 = self.clocksource.convert(result.start());
237 let t1 = self.clocksource.convert(result.stop());
238 let dt = t1 - t0;
239 self.allans.record(result.metric(), dt);
240 self.gauges.set(result.metric(), result.value());
241 self.counters.increment_by(result.metric(), result.count());
242 self.latency_histograms.increment(
243 result.metric(),
244 dt as u64,
245 );
246 self.value_histograms.increment(
247 result.metric(),
248 result.count(),
249 );
250 self.latency_heatmaps.increment(
251 result.metric(),
252 t0 as u64,
253 dt as u64,
254 );
255 self.value_heatmaps.increment(
256 result.metric(),
257 t0 as u64,
258 result.count(),
259 );
260 }
261 results.clear();
262 let _ = self.empty_queue.push(results);
263 trace!("finished processing");
264 }
265 } else if token == Token::Control as usize {
266 if let Ok(msg) = self.control_rx.try_recv() {
267 match msg {
268 ControlMessage::AddInterest(interest) => {
269 self.add_interest(interest);
270 }
271 ControlMessage::RemoveInterest(interest) => {
272 self.remove_interest(&interest);
273 }
274 ControlMessage::SnapshotMeters(tx) => {
275 let meters = self.clone_meters();
276 tx.send(meters).unwrap();
277 }
278 }
279 }
280 }
281 }
282 trace!("run complete");
283 }
284 }
285
286 fn check_elapsed(&mut self, t1: u64) -> bool {
289 let tsc = self.clocksource.counter();
290 if tsc >= t1 {
291 self.meters.clear();
292 for interest in &self.interests {
293 match *interest {
294 Interest::Count(ref key) => {
295 self.meters.set_count(
296 key.clone(),
297 self.counters.count(key.clone()),
298 );
299 }
300 Interest::Gauge(ref key) => {
301 self.meters.set_value(
302 key.clone(),
303 self.gauges.value(key.clone()),
304 );
305 }
306 Interest::LatencyPercentile(ref key) => {
307 for percentile in self.percentiles.clone() {
308 self.meters.set_latency_percentile(
309 key.clone(),
310 percentile.clone(),
311 self.latency_histograms
312 .percentile(key.clone(), percentile.1)
313 .unwrap_or(0),
314 );
315 }
316 }
317 Interest::ValuePercentile(ref key) => {
318 for percentile in self.percentiles.clone() {
319 self.meters.set_value_percentile(
320 key.clone(),
321 percentile.clone(),
322 (self.value_histograms
323 .percentile(key.clone(), percentile.1)
324 .unwrap_or(0) as f64 *
325 self.config.sample_rate) as
326 u64,
327 );
328 }
329 }
330 Interest::AllanDeviation(ref key) => {
331 for tau in self.taus.clone() {
332 if let Ok(adev) = self.allans.adev(key, tau) {
333 self.meters.set_adev(key.clone(), tau, adev);
334 }
335 }
336 }
337 _ => {}
338 }
339 }
340
341 self.latency_histograms.clear();
342 self.value_histograms.clear();
343 self.window_time += self.window_duration;
344 return true;
345 }
346 false
347 }
348
349 pub fn run(&mut self) {
351 let mut window = 0;
352 debug!("collection ready");
353 'outer: loop {
354 'inner: loop {
355 self.run_once();
356 window += 1;
357 if window >= self.config.windows {
358 break 'inner;
359 }
360 }
361
362 self.save_files();
363
364 if !self.config.service_mode {
365 break 'outer;
366 } else {
367 self.clear_heatmaps();
368 self.end_time += self.run_duration;
369 }
370 }
371 }
372
373 pub fn save_files(&mut self) {
375 for interest in self.interests.clone() {
376 match interest {
377 Interest::LatencyTrace(l, f) => {
378 self.latency_heatmaps.trace(l, f);
379 }
380 Interest::ValueTrace(l, f) => {
381 self.value_heatmaps.trace(l, f);
382 }
383 Interest::LatencyWaterfall(l, f) => {
384 self.latency_heatmaps.waterfall(l, f);
385 }
386 Interest::ValueWaterfall(l, f) => {
387 self.value_heatmaps.waterfall(l, f);
388 }
389 _ => {}
390 }
391 }
392 }
393
394 pub fn clone_meters(&self) -> Meters<T> {
396 self.meters.clone()
397 }
398}
399
400#[cfg(feature = "benchmark")]
401#[cfg(test)]
402mod benchmark {
403 extern crate test;
404 use super::*;
405
406 #[bench]
407 fn heavy_cycle(b: &mut test::Bencher) {
408 let mut receiver = Receiver::<String>::new();
409 receiver.add_interest(Interest::Count("test".to_owned()));
410 receiver.add_interest(Interest::LatencyPercentile("test".to_owned()));
411 receiver.add_interest(Interest::AllanDeviation("test".to_owned()));
412 b.iter(|| {
413 receiver.check_elapsed(0);
415 });
416 }
417
418 #[bench]
419 fn cheap_cycle(b: &mut test::Bencher) {
420 let mut receiver = Receiver::<String>::new();
421 receiver.add_interest(Interest::Count("test".to_owned()));
422 receiver.add_interest(Interest::LatencyPercentile("test".to_owned()));
423 receiver.add_interest(Interest::AllanDeviation("test".to_owned()));
424 b.iter(|| {
425 receiver.check_elapsed(u64::max_value());
427 });
428 }
429}