#[macro_use]
extern crate log;
extern crate env_logger;
extern crate getopts;
extern crate hdrhistogram;
extern crate hotmic;
use getopts::Options;
use hdrhistogram::Histogram;
use hotmic::{Receiver, Sink};
use std::{
env,
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
thread,
time::{Duration, Instant},
};
struct Generator {
stats: Sink<&'static str>,
t0: Option<u64>,
gauge: u64,
hist: Histogram<u64>,
done: Arc<AtomicBool>,
}
impl Generator {
fn new(stats: Sink<&'static str>, done: Arc<AtomicBool>) -> Generator {
Generator {
stats,
t0: None,
gauge: 0,
hist: Histogram::<u64>::new_with_bounds(1, u64::max_value(), 3).unwrap(),
done,
}
}
fn run(&mut self) {
loop {
if self.done.load(Ordering::Relaxed) {
break;
}
self.gauge += 1;
let t1 = self.stats.clock().raw();
if let Some(t0) = self.t0 {
let start = self.stats.clock().now();
let _ = self.stats.update_timing("ok", t0, t1);
let _ = self.stats.update_gauge("total", self.gauge);
let delta = self.stats.clock().now() - start;
self.hist.saturating_record(delta);
}
self.t0 = Some(t1);
}
}
}
impl Drop for Generator {
fn drop(&mut self) {
info!(
" sender latency: min: {:9} p50: {:9} p95: {:9} p99: {:9} p999: {:9} max: {:9}",
nanos_to_readable(self.hist.min()),
nanos_to_readable(self.hist.value_at_percentile(50.0)),
nanos_to_readable(self.hist.value_at_percentile(95.0)),
nanos_to_readable(self.hist.value_at_percentile(99.0)),
nanos_to_readable(self.hist.value_at_percentile(99.9)),
nanos_to_readable(self.hist.max())
);
}
}
fn print_usage(program: &str, opts: &Options) {
let brief = format!("Usage: {} [options]", program);
print!("{}", opts.usage(&brief));
}
pub fn opts() -> Options {
let mut opts = Options::new();
opts.optopt("d", "duration", "number of seconds to run the benchmark", "INTEGER");
opts.optopt("p", "producers", "number of producers", "INTEGER");
opts.optopt("c", "capacity", "maximum number of unprocessed items", "INTEGER");
opts.optopt("b", "batch-size", "maximum number of items in a batch", "INTEGER");
opts.optflag("h", "help", "print this help menu");
opts
}
fn main() {
env_logger::init();
let args: Vec<String> = env::args().collect();
let program = &args[0];
let opts = opts();
let matches = match opts.parse(&args[1..]) {
Ok(m) => m,
Err(f) => {
error!("Failed to parse command line args: {}", f);
return;
},
};
if matches.opt_present("help") {
print_usage(program, &opts);
return;
}
info!("hotmic benchmark");
let seconds = matches
.opt_str("duration")
.unwrap_or_else(|| "60".to_owned())
.parse()
.unwrap();
let capacity = matches
.opt_str("capacity")
.unwrap_or_else(|| "1024".to_owned())
.parse()
.unwrap();
let batch_size = matches
.opt_str("batch-size")
.unwrap_or_else(|| "256".to_owned())
.parse()
.unwrap();
let producers = matches
.opt_str("producers")
.unwrap_or_else(|| "1".to_owned())
.parse()
.unwrap();
info!("producers: {}", producers);
info!("capacity: {}", capacity);
info!("batch size: {}", batch_size);
let mut receiver = Receiver::builder().capacity(capacity).batch_size(batch_size).build();
let sink = receiver.get_sink();
let sink = sink.scoped(&["alpha", "pools", "primary"]);
info!("sink configured");
let done = Arc::new(AtomicBool::new(false));
let mut handles = Vec::new();
for _ in 0..producers {
let s = sink.clone();
let d = done.clone();
let handle = thread::spawn(move || {
Generator::new(s, d).run();
});
handles.push(handle);
}
let controller = receiver.get_controller();
thread::spawn(move || {
receiver.run();
});
let ok_key = "alpha.pools.primary.ok".to_owned();
let total_key = "alpha.pools.primary.total".to_owned();
let mut total = 0;
let mut t0 = Instant::now();
let mut snapshot_hist = Histogram::<u64>::new_with_bounds(1, u64::max_value(), 3).unwrap();
for _ in 0..seconds {
let t1 = Instant::now();
let mut turn_total = 0;
let start = Instant::now();
let snapshot = controller.get_snapshot();
let end = Instant::now();
snapshot_hist.saturating_record(duration_as_nanos(end - start) as u64);
let snapshot = snapshot.unwrap().into_simple();
if let Some(t) = snapshot.count(&ok_key) {
turn_total += t as u64;
}
if let Some(t) = snapshot.gauge(&total_key) {
turn_total += t;
}
let turn_delta = turn_total - total;
total = turn_total;
let rate = turn_delta as f64 / (duration_as_nanos(t1 - t0) / 1_000_000_000.0);
info!("sample ingest rate: {:.0} samples/sec", rate);
t0 = t1;
thread::sleep(Duration::new(1, 0));
}
info!("--------------------------------------------------------------------------------");
info!(" ingested samples total: {}", total);
info!(
"snapshot retrieval: min: {:9} p50: {:9} p95: {:9} p99: {:9} p999: {:9} max: {:9}",
nanos_to_readable(snapshot_hist.min()),
nanos_to_readable(snapshot_hist.value_at_percentile(50.0)),
nanos_to_readable(snapshot_hist.value_at_percentile(95.0)),
nanos_to_readable(snapshot_hist.value_at_percentile(99.0)),
nanos_to_readable(snapshot_hist.value_at_percentile(99.9)),
nanos_to_readable(snapshot_hist.max())
);
done.store(true, Ordering::SeqCst);
for handle in handles {
let _ = handle.join();
}
}
fn duration_as_nanos(d: Duration) -> f64 { (d.as_secs() as f64 * 1e9) + d.subsec_nanos() as f64 }
fn nanos_to_readable(t: u64) -> String {
let f = t as f64;
if f < 1_000.0 {
format!("{}ns", f)
} else if f < 1_000_000.0 {
format!("{:.0}μs", f / 1_000.0)
} else if f < 2_000_000_000.0 {
format!("{:.2}ms", f / 1_000_000.0)
} else {
format!("{:.3}s", f / 1_000_000_000.0)
}
}