use std::sync::Arc;
use std::time::Instant;
use std::fmt::Display;
use std::hash::Hash;
use std::net::ToSocketAddrs;
use mpmc::Queue;
use tiny_http::{Server, Response, Request};
use config::Config;
use meters::Meters;
use histograms::Histograms;
use heatmaps::Heatmaps;
use sample::Sample;
#[derive(Clone)]
pub enum Interest<T> {
Count(T),
Percentile(T),
Trace(T, String),
Waterfall(T, String),
}
#[derive(Clone)]
pub struct Percentile(String, f64);
#[derive(Clone)]
pub struct Sender<T> {
queue: Arc<Queue<Sample<T>>>,
}
impl<T: Hash + Eq + Send + Clone> Sender<T> {
pub fn send(&self, sample: Sample<T>) -> Result<(), Sample<T>> {
self.queue.push(sample)
}
}
pub struct Receiver<T> {
config: Config<T>,
queue: Arc<Queue<Sample<T>>>,
histograms: Histograms<T>,
meters: Meters<String>,
interests: Vec<Interest<T>>,
percentiles: Vec<Percentile>,
heatmaps: Heatmaps<T>,
}
impl<T: Hash + Eq + Send + Clone + Display> Default for Receiver<T> {
fn default() -> Self {
Config::new().build()
}
}
impl<T: Hash + Eq + Send + Display + Clone> Receiver<T> {
pub fn new() -> Receiver<T> {
Default::default()
}
pub fn configured(config: Config<T>) -> Receiver<T> {
let queue = Arc::new(Queue::<Sample<T>>::with_capacity(8));
let slices = config.duration * config.windows;
Receiver {
config: config,
queue: queue,
histograms: Histograms::new(),
meters: Meters::new(),
interests: Vec::new(),
percentiles: default_percentiles(),
heatmaps: Heatmaps::new(slices),
}
}
pub fn configure() -> Config<T> {
Config::default()
}
pub fn get_sender(&self) -> Sender<T> {
Sender { queue: self.queue.clone() }
}
pub fn add_interest(&mut self, interest: Interest<T>) {
self.interests.push(interest)
}
pub fn run_once(&mut self, server: &Option<Server>) {
let duration = self.config.duration;
let t0 = Instant::now();
loop {
if let Some(result) = self.queue.pop() {
self.histograms.increment(result.metric(), result.duration());
self.heatmaps.increment(result.metric(), result.start(), result.duration());
}
self.try_handle_http(server);
let t1 = Instant::now();
if (t1 - t0).as_secs() >= duration as u64 {
for interest in self.interests.clone() {
match interest {
Interest::Count(l) => {
self.meters.set(format!("{}_count", l), self.heatmaps.metric_count(l));
}
Interest::Percentile(l) => {
for percentile in self.percentiles.clone() {
let v = l.clone();
self.meters.set(format!("{}_{}_nanoseconds", v, percentile.0),
self.histograms
.metric_percentile(v, percentile.1)
.unwrap_or(0));
}
}
Interest::Trace(_, _) |
Interest::Waterfall(_, _) => {}
}
}
self.meters.set("all_samples_total".to_owned(), self.heatmaps.total_count());
for percentile in self.percentiles.clone() {
self.meters.set(format!("all_samples_{}_nanoseconds", percentile.0),
self.histograms
.total_percentile(percentile.1)
.unwrap_or(0));
}
self.histograms.clear();
break;
}
}
}
pub fn run(&mut self) {
let listen = self.config.http_listen.clone();
let server = self.start_listener(listen);
let mut window = 0;
debug!("collection ready");
loop {
self.run_once(&server);
window += 1;
if window >= self.config.windows {
break;
}
}
for interest in self.interests.clone() {
match interest {
Interest::Count(_) |
Interest::Percentile(_) => {}
Interest::Trace(l, f) => {
self.heatmaps.metric_trace(l, f);
}
Interest::Waterfall(l, f) => {
self.heatmaps.metric_waterfall(l, f);
}
}
}
self.save_trace();
self.save_waterfall();
}
pub fn save_trace(&mut self) {
if let Some(file) = self.config.trace_file.clone() {
debug!("saving trace file");
self.heatmaps.total_trace(file);
}
}
pub fn save_waterfall(&mut self) {
if let Some(file) = self.config.waterfall_file.clone() {
debug!("stats: saving waterfall render");
self.heatmaps.total_waterfall(file);
}
}
fn start_listener(&self, listen: Option<String>) -> Option<Server> {
if let Some(ref l) = listen {
let http_socket = l.to_socket_addrs().unwrap().next().unwrap();
debug!("stats: starting HTTP listener");
return Some(Server::http(http_socket).unwrap());
}
None
}
fn try_handle_http(&self, server: &Option<Server>) {
if let Some(ref s) = *server {
if let Ok(Some(request)) = s.try_recv() {
debug!("stats: handle http request");
self.handle_http(request);
}
}
}
fn handle_http(&self, request: Request) {
let mut output = "".to_owned();
match request.url() {
"/histogram" => {
for bucket in &self.histograms.total {
if bucket.count() > 0 {
output = output + &format!("{} {}\n", bucket.value(), bucket.count());
}
}
}
"/vars" | "/metrics" => {
for (stat, value) in &self.meters.data {
output = output + &format!("{} {}\n", stat, value);
}
}
_ => {
output = output + "{";
for (stat, value) in &self.meters.data {
output = output + &format!("\"{}\":{},", stat, value);
}
output = output + "}";
}
}
let response = Response::from_string(output);
let _ = request.respond(response);
}
}
fn default_percentiles() -> Vec<Percentile> {
let mut p = Vec::new();
p.push(Percentile("min".to_owned(), 0.0));
p.push(Percentile("p50".to_owned(), 50.0));
p.push(Percentile("p75".to_owned(), 75.0));
p.push(Percentile("p90".to_owned(), 90.0));
p.push(Percentile("p95".to_owned(), 95.0));
p.push(Percentile("p99".to_owned(), 99.0));
p.push(Percentile("p999".to_owned(), 99.9));
p.push(Percentile("p9999".to_owned(), 99.99));
p.push(Percentile("max".to_owned(), 100.0));
p
}