#![warn(missing_copy_implementations, missing_debug_implementations, missing_docs)]
use std::ops::{Range};
use std::time::{Duration};
#[macro_export]
macro_rules! queuecheck_bench_latency {
($pairs:expr, $producers:expr, $consumers:expr, $produce:expr, $consume:expr) => ({
use std::thread;
use std::sync::{Arc, Barrier};
use std::time::{Instant};
let (warmup, measurement) = $pairs;
let producers = $producers;
let consumers = $consumers;
let plength = producers.len();
let clength = consumers.len();
let barrier = Arc::new(Barrier::new(plength + clength));
let pwranges = $crate::partition(plength, warmup).into_iter();
let pmranges = $crate::partition(plength, measurement).into_iter();
let pthreads = producers.into_iter().zip(pwranges).zip(pmranges).map(|((p, w), m)| {
let barrier = barrier.clone();
thread::spawn(move || {
barrier.wait();
for index in w { $produce(&p, index); }
m.map(|i| {
let start = Instant::now();
$produce(&p, i);
Instant::now() - start
}).collect::<Vec<_>>().into_iter()
})
}).collect::<Vec<_>>().into_iter();
let cwranges = $crate::partition(clength, warmup).into_iter();
let cmranges = $crate::partition(clength, measurement).into_iter();
let cthreads = consumers.into_iter().zip(cwranges).zip(cmranges).map(|((c, w), m)| {
let barrier = barrier.clone();
thread::spawn(move || {
barrier.wait();
for _ in w { while $consume(&c).is_none() { } }
m.map(|_| {
let start = Instant::now();
while $consume(&c).is_none() { }
Instant::now() - start
}).collect::<Vec<_>>().into_iter()
})
}).collect::<Vec<_>>().into_iter();
let produce = pthreads.flat_map(|t| t.join().unwrap().map($crate::nanoseconds)).collect();
let consume = cthreads.flat_map(|t| t.join().unwrap().map($crate::nanoseconds)).collect();
$crate::Latency::new(produce, consume)
});
}
#[macro_export]
macro_rules! queuecheck_bench_throughput {
($pairs:expr, $producers:expr, $consumers:expr, $produce:expr, $consume:expr) => ({
use std::thread;
use std::sync::{Arc, Barrier};
use std::time::{Duration, Instant};
let (warmup, measurement) = $pairs;
let producers = $producers;
let consumers = $consumers;
let plength = producers.len();
let clength = consumers.len();
let barrier = Arc::new(Barrier::new(plength + clength));
let pwranges = $crate::partition(plength, warmup).into_iter();
let pmranges = $crate::partition(plength, measurement).into_iter();
let pthreads = producers.into_iter().zip(pwranges).zip(pmranges).map(|((p, w), m)| {
let barrier = barrier.clone();
thread::spawn(move || {
barrier.wait();
for index in w { $produce(&p, index); }
let start = Instant::now();
for index in m { $produce(&p, index); }
Instant::now() - start
})
}).collect::<Vec<_>>().into_iter();
let cwranges = $crate::partition(clength, warmup).into_iter();
let cmranges = $crate::partition(clength, measurement).into_iter();
let cthreads = consumers.into_iter().zip(cwranges).zip(cmranges).map(|((c, w), m)| {
let barrier = barrier.clone();
thread::spawn(move || {
barrier.wait();
for _ in w { while $consume(&c).is_none() { } }
let start = Instant::now();
for _ in m { while $consume(&c).is_none() { } }
Instant::now() - start
})
}).collect::<Vec<_>>().into_iter();
let mut duration = Duration::default();
duration += pthreads.map(|t| t.join().unwrap()).sum();
duration += cthreads.map(|t| t.join().unwrap()).sum();
duration /= (clength + plength) as u32;
(measurement as f64 / $crate::nanoseconds(duration)) * 1_000_000_000.0
});
}
#[macro_export]
macro_rules! queuecheck_test {
($pairs:expr, $producers:expr, $consumers:expr, $produce:expr, $consume:expr) => ({
use std::thread;
use std::sync::{Arc, Barrier};
let pairs = $pairs;
let producers = $producers;
let consumers = $consumers;
let barrier = Arc::new(Barrier::new(producers.len() + consumers.len()));
let pranges = $crate::partition(producers.len(), pairs).into_iter();
let pthreads = producers.into_iter().zip(pranges).map(|(p, r)| {
let barrier = barrier.clone();
thread::spawn(move || {
barrier.wait();
for index in r { $produce(&p, index.to_string()); }
})
}).collect::<Vec<_>>();
let cranges = $crate::partition(consumers.len(), pairs).into_iter();
let cthreads = consumers.into_iter().zip(cranges).map(|(c, r)| {
let barrier = barrier.clone();
thread::spawn(move || {
barrier.wait();
let mut indices = Vec::with_capacity(r.len());
while indices.len() < r.len() {
if let Some(index) = $consume(&c) {
match index.parse::<usize>() {
Ok(index) => indices.push(index),
_ => panic!("invalid index string: {:?}", index),
}
}
}
indices
})
}).collect::<Vec<_>>();
for thread in pthreads { thread.join().unwrap(); }
let mut indices = Vec::with_capacity(pairs);
for thread in cthreads { indices.extend(thread.join().unwrap()); }
indices.sort();
let expected = (0..pairs).filter(|i| indices.binary_search(i).is_err()).collect::<Vec<_>>();
let unexpected = indices.iter().cloned().filter(|i| *i >= pairs).collect::<Vec<_>>();
if !expected.is_empty() || !unexpected.is_empty() {
panic!("dropped: {:?}, invalid: {:?}", expected, unexpected);
}
});
}
#[derive(Clone, Debug)]
pub struct Data(Vec<f64>);
impl Data {
pub fn percentile(&self, rank: f64) -> f64 {
assert!(rank >= 0.0 && rank <= 100.0, "`rank` must be in the range [0.0, 100.0]");
self.0[((self.0.len() - 1) as f64 * (rank / 100.0)) as usize]
}
fn report(&self, name: &str, ranks: &[f64]) {
println!(" {}", name);
for rank in ranks {
let name = format!("{}%:", rank);
println!(" {:<10} {}ns", name, thousands(self.percentile(*rank), 2));
}
}
}
#[derive(Clone, Debug)]
pub struct Latency {
pub produce: Data,
pub consume: Data,
}
impl Latency {
pub fn new(mut produce: Vec<f64>, mut consume: Vec<f64>) -> Self {
produce.sort_by(|a, b| a.partial_cmp(b).unwrap());
consume.sort_by(|a, b| a.partial_cmp(b).unwrap());
Latency { produce: Data(produce), consume: Data(consume) }
}
pub fn report(&self, name: &str, ranks: &[f64]) {
println!("{}", name);
self.produce.report("produce", ranks);
self.consume.report("consume", ranks);
}
}
fn thousands(number: f64, precision: usize) -> String {
let mut string = format!("{:.*}", precision, number);
let mut index = string.find('.').unwrap();
while index > 3 {
index -= 3;
string.insert(index, '_');
}
string
}
#[doc(hidden)]
pub fn nanoseconds(duration: Duration) -> f64 {
(duration.as_secs() * 1_000_000_000) as f64 + duration.subsec_nanos() as f64
}
#[doc(hidden)]
pub fn partition(threads: usize, operations: usize) -> Vec<Range<i32>> {
let factor = operations / threads;
(0..threads).map(|t| {
let end = if t + 1 == threads { operations } else { factor * (t + 1) };
((factor * t) as i32)..(end as i32)
}).collect()
}